+ Neural network CLI + Hidden Markov Model CLI + K-Means clustering CLI + Linear regression CLI + Screenshots, updated README instructions
		
			
				
	
	
		
			650 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			650 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
################################################################################
 | 
						|
# Author: Shaun Reed                                                           #
 | 
						|
# About: ANN implementation with adjustable layers and layer lengths           #
 | 
						|
# Contact: shaunrd0@gmail.com  | URL: www.shaunreed.com  | GitHub: shaunrd0    #
 | 
						|
################################################################################
 | 
						|
 | 
						|
from matplotlib import pyplot as plt
 | 
						|
from sklearn.datasets import load_iris
 | 
						|
from typing import List
 | 
						|
import argparse
 | 
						|
import json
 | 
						|
import math
 | 
						|
import numpy as np
 | 
						|
import pandas as pd  # Unused unless optional code is manually uncommented
 | 
						|
import random
 | 
						|
import sys
 | 
						|
import viznet as vn
 | 
						|
 | 
						|
 | 
						|
################################################################################
 | 
						|
# CLI Argument Parser
 | 
						|
################################################################################
 | 
						|
 | 
						|
# ==============================================================================
 | 
						|
 | 
						|
def init_parser():
 | 
						|
    parser = argparse.ArgumentParser(
 | 
						|
        description='Neural network implementation',
 | 
						|
        formatter_class=argparse.RawTextHelpFormatter
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        'inputs', metavar='INPUTS', type=int, nargs='?',
 | 
						|
        help=
 | 
						|
        '''Number of inputs for the neural network
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=3
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        'perceptrons', metavar='PERCEPTRONS', type=int, nargs='?',
 | 
						|
        help=
 | 
						|
        '''Number of perceptrons in each hidden layer
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=8
 | 
						|
    )
 | 
						|
    
 | 
						|
    parser.add_argument(
 | 
						|
        'outputs', metavar='OUTPUTS', type=int, nargs='?',
 | 
						|
        help=
 | 
						|
        '''Number of outputs for the neural network
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=3
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--hidden-layers', '-l', metavar='HIDDEN_LAYERS', type=int, nargs='?',
 | 
						|
        help=
 | 
						|
        '''Number of hidden layers
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=1
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--cycles', '-c', metavar='CYCLES', type=int, nargs='?',
 | 
						|
        help=
 | 
						|
        '''Number of cycles to run through the network
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=3
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--learn-rate', metavar='LEARNING_RATE', type=float, nargs='?',
 | 
						|
        help=
 | 
						|
        '''Learning rate to use for the network.
 | 
						|
    Must be within range of 0.0 < rate <= 1.0
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=0.25
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--bias', '-b', metavar='INITIAL_BIAS', type=float, nargs='?',
 | 
						|
        help=
 | 
						|
        '''The initial bias to use for perceptrons within the network.
 | 
						|
    Must be within range of -1.0 <= bias <= 1.0
 | 
						|
    If value is unset, bias will be initialized randomly
 | 
						|
         ''',
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--weight', '-w', metavar='INITIAL_EDGE_WEIGHTS', type=float, nargs='?',
 | 
						|
        help=
 | 
						|
        '''The initial edge weight to use for node connections in the network
 | 
						|
    If value is unset, edge weights will be initialized randomly
 | 
						|
        '''
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--error-threshold', '--error', metavar='ERROR_THRESHOLD', type=float, nargs='?',
 | 
						|
        help=
 | 
						|
        '''The acceptable error threshold to use for training the network.
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=0.5
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--fire-threshold', '--fire', metavar='FIRE_THRESHOLD', type=float, nargs='?',
 | 
						|
        help=
 | 
						|
        '''The fire threshold for perceptrons in the network.
 | 
						|
    If a perceptron\'s cumulative inputs reach this value, the perceptron fires
 | 
						|
    (default: '%(default)s')
 | 
						|
    ''',
 | 
						|
        default=0.25
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--spacing', metavar='LAYER_SPACING', type=float, nargs='?',
 | 
						|
        help=
 | 
						|
        '''Distance between origin of network layers within visualization
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=2.0
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--horizontal', '--flip', action='store_true',
 | 
						|
        help=
 | 
						|
        '''The network visualization will flow left-to-right
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=False
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--silent', action='store_true',
 | 
						|
        help=
 | 
						|
        '''Do not show the network visualization, only print output to console
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=False
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--verbose', '-v', action='store_true',
 | 
						|
        help=
 | 
						|
        '''When this flag is set, error rate and change in weight will be output for each calculation
 | 
						|
    (default: '%(default)s')
 | 
						|
        ''',
 | 
						|
        default=False
 | 
						|
    )
 | 
						|
 | 
						|
    parser.add_argument(
 | 
						|
        '--file', '-f', metavar='file_path', nargs='?', type=open,
 | 
						|
        help=
 | 
						|
        '''Optionally provide a json file to configure any option available through the cli
 | 
						|
    json keys match --long version of each option, where --long-split option key is "long_split" in json
 | 
						|
        ''',
 | 
						|
    )
 | 
						|
 | 
						|
    return parser
 | 
						|
 | 
						|
 | 
						|
################################################################################
 | 
						|
# Neural Network
 | 
						|
################################################################################
 | 
						|
 | 
						|
# ==============================================================================
 | 
						|
 | 
						|
def parse_file():
 | 
						|
    """
 | 
						|
    Validates keys in JSON file and updates CLI input context
 | 
						|
 | 
						|
    :return: (seq_input, seq_label) Initialized to input and label sequences in JSON file if present
 | 
						|
    """
 | 
						|
 | 
						|
    # Load the JSON input file, validate keys
 | 
						|
    file_data = json.load(context['file'])
 | 
						|
    for key in file_data:
 | 
						|
        if key == "input_sequence" or key == "label_sequence":
 | 
						|
            continue
 | 
						|
        assert key in context
 | 
						|
    # Update the CLI context with JSON input
 | 
						|
    context.update(file_data)
 | 
						|
 | 
						|
    # If JSON file provided input and label sequences, load and return them
 | 
						|
    seq_input = seq_label = None
 | 
						|
    if 'input_sequence' in file_data:
 | 
						|
        seq_input = np.array(file_data['input_sequence'])
 | 
						|
    if 'label_sequence' in file_data:
 | 
						|
        seq_label = np.array(file_data['label_sequence'])
 | 
						|
    return seq_input, seq_label
 | 
						|
 | 
						|
 | 
						|
def network_layers():
 | 
						|
    """
 | 
						|
    Initialize a dictionary of layers where each layer is a list of nodes: {'input': [0, 1, 2]}
 | 
						|
    The hidden layer in this dictionary is a list of lists for each hidden layer: {'hidden': [[3, 4, 5], [6, 7, 8]]}
 | 
						|
 | 
						|
    :return: A dictionary, as an example: {'input': [0, 1, 2], 'hidden': [[3, 4, 5], [6, 7, 8]], 'output': [9, 10, 11] }
 | 
						|
    """
 | 
						|
    inputs = [i for i in range(context["inputs"])]
 | 
						|
    offset = context["inputs"]
 | 
						|
 | 
						|
    # For each hidden layer add the requested number of perceptrons
 | 
						|
    hidden = [[] for x in range(context["hidden_layers"])]
 | 
						|
    for x in range(context["hidden_layers"]):
 | 
						|
        hidden[x] = [i for i in range(offset, context["perceptrons"] + offset)]
 | 
						|
        offset += context["perceptrons"]
 | 
						|
 | 
						|
    outputs = [i for i in range(offset, context["outputs"] + offset)]
 | 
						|
    offset += context["outputs"]
 | 
						|
 | 
						|
    layers = {"inputs": inputs, "hidden": hidden, "outputs": outputs}
 | 
						|
    [print(f'{layer} layer: {layers[layer]}') for layer in layers]
 | 
						|
    return layers
 | 
						|
 | 
						|
 | 
						|
def random_matrix(rows, cols, low=-1.0, high=1.0):
 | 
						|
    """ Produce a random matrix of size ROWSxCOLS using LOW and HIGH as upper and lower bounds """
 | 
						|
    return np.random.uniform(low, high, (rows, cols))
 | 
						|
 | 
						|
 | 
						|
def get_matrix_dict():
 | 
						|
    """
 | 
						|
    Produces a dictionary that holds edge weight transition matrices for each layer of the network
 | 
						|
    matrix_dict['input'] maps to a single 2D matrix
 | 
						|
 | 
						|
    matrix_dict['hidden'] maps to a 3D matrix
 | 
						|
    + where matrix_dict['hidden'][0] is the 2D transition matrix for the first hidden layer
 | 
						|
 | 
						|
    :return: A dictionary, as an example: {'input': [[...]], 'hidden': [[[...]], [[...]]], 'output': [[...]] }
 | 
						|
    """
 | 
						|
    if context["weight"] is None:
 | 
						|
        # Create matrices to represent edges and weights for each layer of the network
 | 
						|
        input_matrix = random_matrix(context["inputs"], context["perceptrons"])
 | 
						|
        hidden_matrices = [random_matrix(context["perceptrons"], context["perceptrons"])
 | 
						|
                           for x in range(context["hidden_layers"]-1)]
 | 
						|
        output_matrix = random_matrix(context["perceptrons"], context["outputs"])
 | 
						|
    else:
 | 
						|
        # If an initial edge weight was specified, fill matrices with that value instead of generating randomly
 | 
						|
        input_matrix = np.full((context["inputs"], context["perceptrons"]), context["weight"])
 | 
						|
        hidden_matrices = [np.full((context["perceptrons"], context["perceptrons"]), context["weight"])
 | 
						|
                           for x in range(context["hidden_layers"]-1)]
 | 
						|
        output_matrix = np.full((context["perceptrons"], context["outputs"]), context["weight"])
 | 
						|
 | 
						|
    matrix_dict = {'input': input_matrix, 'hidden': np.array(hidden_matrices), 'output': output_matrix}
 | 
						|
    return matrix_dict
 | 
						|
 | 
						|
 | 
						|
def get_bias_dict():
 | 
						|
    """
 | 
						|
    Produces a dictionary that stores bias vectors for perceptrons in each layer of the network
 | 
						|
    The hidden layer in this dictionary is a list of lists for bias in each hidden layer: {'hidden': [[...], [...]]}
 | 
						|
 | 
						|
    :return: A dictionary, as an example: {'input': [0.5, 0.5], 'hidden': [[0.5, 0.5 0.5], ...], 'output': [...] }
 | 
						|
    """
 | 
						|
    # If there was a bias provided, use it; Else use random perceptron bias
 | 
						|
    bias = round(random.uniform(-1.0, 1.0), 2) if context["bias"] is None else context["bias"]
 | 
						|
    # Create vectors to represent perceptron bias in each layer
 | 
						|
    input_bias = [bias for x in range(0, context["inputs"])]
 | 
						|
    hidden_bias = [[bias for x in range(0, context["perceptrons"])] for x in range(0, context["hidden_layers"])]
 | 
						|
    output_bias = [bias for x in range(0, context["outputs"])]
 | 
						|
    bias_dict = {'input': input_bias, 'hidden': hidden_bias, 'output': output_bias}
 | 
						|
    return bias_dict
 | 
						|
 | 
						|
 | 
						|
def threshold_fire(input_sum):
 | 
						|
    """
 | 
						|
    Applies step function using fire_threshold set by CLI to determine if perceptron is firing or not
 | 
						|
 | 
						|
    :param input_sum: The sum of inputs for this perceptron
 | 
						|
    :return: A list of outputs for each perceptron in the layer. If only the first fired: [1, 0, 0, 0]
 | 
						|
    """
 | 
						|
    output = [1 if val > context["fire_threshold"] else 0 for val in input_sum.tolist()]
 | 
						|
    return output
 | 
						|
 | 
						|
 | 
						|
def adjust_weight(matrix_dict, out_output, label):
 | 
						|
    """
 | 
						|
    Back propagation for adjusting edge weights of nodes that produces incorrect output
 | 
						|
 | 
						|
    :param matrix_dict: A dictionary of matrices for the network produces by get_matrix_dict()
 | 
						|
    :param out_output: The actual output for this input sequence
 | 
						|
    :param label: The desired result for this input sequence
 | 
						|
    :return: A dictionary of transition matrices for the network with adjusted edge weights
 | 
						|
    """
 | 
						|
    # Find erroneous indices
 | 
						|
    bad_nodes = error_nodes(out_output, label)
 | 
						|
    if len(bad_nodes) == 0:
 | 
						|
        return matrix_dict
 | 
						|
 | 
						|
    # Adjust the edge weights leading to the error nodes; Don't adjust correct nodes
 | 
						|
    for layer, mat in reversed(matrix_dict.items()):
 | 
						|
        if layer == 'output':
 | 
						|
            for node in bad_nodes:
 | 
						|
                for row in range(len(mat)):
 | 
						|
                    mod = context['learn_rate'] * (label[node] - out_output[node])  # * Input (???)
 | 
						|
                    if context['verbose']:
 | 
						|
                        print(f'Adjusting output weights at ({row}, {node}) with {mod}')
 | 
						|
                    mat[row][node] += mod
 | 
						|
 | 
						|
        # In a fully connected neural network, all edges are updated if any output node is wrong
 | 
						|
        # + Every node of every layer connects to every node in the next layer
 | 
						|
        # + Any wrong node updates all edges in previous layers
 | 
						|
        if layer == 'hidden':
 | 
						|
            # If there are any hidden layers that do not connect to input or output layers directly
 | 
						|
            if mat.size > 0:
 | 
						|
                # For each hidden layer matrix, update all edge weights
 | 
						|
                for i, hl_mat in enumerate(mat):
 | 
						|
                    for row in range(len(hl_mat)):
 | 
						|
                        mod = context['learn_rate']
 | 
						|
                        for col in range(len(hl_mat[row])):
 | 
						|
                            # print(f'Adjusting output weights at ({row}, {col}) with {mod}')
 | 
						|
                            mat[i][row][col] += context["learn_rate"]
 | 
						|
 | 
						|
        if layer == 'input':
 | 
						|
            for row in range(len(mat)):
 | 
						|
                mod = context['learn_rate']
 | 
						|
                for col in range(len(mat[row])):
 | 
						|
                    # print(f'Adjusting output weights at ({row}, {col}) with {mod}')
 | 
						|
                    mat[row][col] += mod
 | 
						|
    return matrix_dict
 | 
						|
 | 
						|
 | 
						|
def error_rate(actual_output, label):
 | 
						|
    """
 | 
						|
    Determines error rate for this input sequence
 | 
						|
    Error rate is later used to determine if edge weights should be adjusted
 | 
						|
 | 
						|
    :param actual_output: The actual output for this input sequence
 | 
						|
    :param label: The desired output for this input sequence
 | 
						|
    :return: The error rate for the sequence
 | 
						|
    """
 | 
						|
    error_sum = 0
 | 
						|
    for n, output in enumerate(actual_output):
 | 
						|
        err = label[n] - output
 | 
						|
        error_sum += math.pow(err, 2)
 | 
						|
    err = math.sqrt(error_sum)
 | 
						|
    return err
 | 
						|
 | 
						|
 | 
						|
def error_nodes(out_output, label):
 | 
						|
    """
 | 
						|
    Find which output nodes are incorrect
 | 
						|
 | 
						|
    :param out_output: Actual output for this input sequence
 | 
						|
    :param label: The desired output for this input sequence
 | 
						|
    :return: A list of node indices that produced the wrong output for this sequence
 | 
						|
    """
 | 
						|
    # Loop through each output, check if it matches the label; If it doesn't add index to returned list
 | 
						|
    return [i for i, output in enumerate(out_output) if output != label[i]]
 | 
						|
 | 
						|
 | 
						|
def layer_pass(weight_matrix, input_vector, bias_vector):
 | 
						|
    """
 | 
						|
    Passes input from layer A to layer B
 | 
						|
 | 
						|
    :param weight_matrix: Transition matrix of edge weights where perceptrons from layer A are rows and B are columns
 | 
						|
    :param input_vector: An input vector that represents the output from A to B
 | 
						|
    :param bias_vector: The bias vector for perceptrons in layer B
 | 
						|
    :return: Final output from the layer, after step function is applied in threshold_fire()
 | 
						|
    """
 | 
						|
    layer_edge_weights = np.array(weight_matrix).T
 | 
						|
    prev_output = np.atleast_2d(input_vector).T
 | 
						|
    this_layer_input = layer_edge_weights.dot(prev_output).T.flatten()
 | 
						|
    this_layer_input += np.array(bias_vector)
 | 
						|
 | 
						|
    return threshold_fire(this_layer_input)
 | 
						|
 | 
						|
 | 
						|
def train_network(seq_input, seq_label, bias_dict, matrix_dict):
 | 
						|
    """
 | 
						|
    Performs forward pass through network, moving through the number of cycles requested by the CLI
 | 
						|
 | 
						|
    :param seq_input: Sequence of inputs to feed into the network
 | 
						|
    :param seq_label: Sequence of labels to verify network output and indicate error
 | 
						|
    :param bias_dict: Dictionary of bias vectors for the perceptrons in each layer
 | 
						|
    :param matrix_dict: Dictionary of transition matrices for the edge weights between layers in the network
 | 
						|
    :return: Information gathered from training the network used to output final accuracy
 | 
						|
    """
 | 
						|
    # Info dictionary used to track accuracy
 | 
						|
    info = {'correct': 0, 'wrong': 0, 'total': len(seq_input) * context["cycles"], 'first_acc': 0}
 | 
						|
 | 
						|
    # A list of error rates for each cycle
 | 
						|
    # + These aren't used much for the program, but they hold nice data to explore while debugging
 | 
						|
    cycle_errors = []
 | 
						|
    cycle_outputs = [[] for x in range(context["cycles"])]
 | 
						|
    for cycle_index in range(1, context["cycles"] + 1):
 | 
						|
        # print(f'\nCycle number {cycle_index}')
 | 
						|
        for seq_index in range(0, len(seq_input)):
 | 
						|
            # One list for storing the outputs of each layer, and another to store inputs
 | 
						|
            seq_outputs = []
 | 
						|
 | 
						|
            # Input layer -> Hidden layer
 | 
						|
            # Apply input perceptron bias vector to initial inputs of the input layer
 | 
						|
            in_input = np.array(np.array(seq_input[seq_index]) + np.array(bias_dict['input']))
 | 
						|
            # Find output of the input layer
 | 
						|
            in_output = threshold_fire(in_input)
 | 
						|
            seq_outputs.append(in_output)
 | 
						|
 | 
						|
            # Find output for first hidden layer
 | 
						|
            hl_output = layer_pass(matrix_dict["input"], seq_outputs[-1], bias_dict['hidden'][0])
 | 
						|
            seq_outputs.append(hl_output)
 | 
						|
 | 
						|
            # For each hidden layer find inputs and outputs, up until the last hidden layer
 | 
						|
            # + Start at 1 since we already have the output from first hidden layer
 | 
						|
            for layer_index in range(1, context["hidden_layers"]):
 | 
						|
                # Hidden layer -> Hidden layer
 | 
						|
                edges = matrix_dict['hidden'][layer_index - 1]
 | 
						|
                bias = bias_dict['hidden'][layer_index - 1]
 | 
						|
                # Find output for hidden layer N
 | 
						|
                hl_output = layer_pass(edges, seq_outputs[-1], bias)
 | 
						|
                seq_outputs.append(hl_output)
 | 
						|
 | 
						|
            # Hidden layer -> Output layer
 | 
						|
            # Find output for output layer
 | 
						|
            out_output = layer_pass(matrix_dict['output'], seq_outputs[-1], bias_dict['output'])
 | 
						|
            seq_outputs.append(out_output)
 | 
						|
 | 
						|
            # Forward pass through network finished
 | 
						|
            # Find error rate for this input sequence
 | 
						|
            err = error_rate(out_output, seq_label[seq_index])
 | 
						|
 | 
						|
            if context['verbose'] and err > 0:
 | 
						|
                print(f'Error rate for sequence {seq_index} cycle {cycle_index}: {err}')
 | 
						|
            # If error rate for this sequence is above threshold, adjust weighted edges
 | 
						|
            if err > context["error_threshold"]:
 | 
						|
                matrix_dict = adjust_weight(matrix_dict, out_output, seq_label[seq_index])
 | 
						|
 | 
						|
            # Track correctness of sequences and cycles
 | 
						|
            if err == 0:
 | 
						|
                info['correct'] += 1
 | 
						|
            else:
 | 
						|
                info['wrong'] += 1
 | 
						|
 | 
						|
            # Append the result to the cycle_outputs list for this cycle; -1 for 0 index array offset
 | 
						|
            # cycle_outputs contains a list for each cycle. Each list contains N outputs for N input sequences
 | 
						|
            cycle_outputs[cycle_index - 1].append(out_output)
 | 
						|
            cycle_errors.append(err)
 | 
						|
 | 
						|
        # Move to next learning cycle in for loop
 | 
						|
        info_total_temp = info['correct'] + info['wrong']
 | 
						|
        if cycle_index == 1:
 | 
						|
            info['first_acc'] = round(100.0 * float(info["correct"] / info_total_temp), 4)
 | 
						|
        print(
 | 
						|
            f'[Cycle {cycle_index}]   \tAccuracy: {100.0 * float(info["correct"] / info_total_temp):.4f}% \t'
 | 
						|
            f'[{info["correct"]} / {info["wrong"]}]'
 | 
						|
        )
 | 
						|
        if context["verbose"]:
 | 
						|
            for layer in matrix_dict:
 | 
						|
                print(
 | 
						|
                    f'Network {layer} layer: \n{matrix_dict[layer]}\n'
 | 
						|
                    # Bias vector doesn't change, so it's not very interesting output per-cycle
 | 
						|
                    # f'{layer} bias vector: {bias_dict[layer]}'
 | 
						|
                )
 | 
						|
 | 
						|
    info['cycle_error'] = cycle_errors
 | 
						|
    return info
 | 
						|
 | 
						|
 | 
						|
def draw_graph(net_plot, net_layers, draw_horizontal=None, spacing=None):
 | 
						|
    """
 | 
						|
    This is the only function where viznet is used. Viznet is a module to visualize network graphs using matplotlib.
 | 
						|
    https://viznet.readthedocs.io/en/latest/core.html
 | 
						|
    https://viznet.readthedocs.io/en/latest/examples.html#examples
 | 
						|
 | 
						|
    To draw the graph, we need to at least specify the following information for-each layer in the network -
 | 
						|
        1. The number of nodes in the layer
 | 
						|
        2. The type of nodes that make up each layer (https://viznet.readthedocs.io/en/latest/viznet.theme.html)
 | 
						|
        3. The distance between the center (origin) of each layer
 | 
						|
    With this we can use viznet helper functions to draw network
 | 
						|
 | 
						|
    :param net_plot: A matplotlib subplot to draw the network on
 | 
						|
    :param net_layers: A dictionary of layers that make up the network nodes
 | 
						|
    :param draw_horizontal: True if graph should be drawn so direction flows left->right; False for bottom->top
 | 
						|
    :param spacing: The distance between the center of origin for each layer in the network
 | 
						|
    """
 | 
						|
    # If no spacing was provided to the call, use spacing set by CLI
 | 
						|
    spacing = context["spacing"] if spacing is None else spacing
 | 
						|
    # If no draw mode was provided to the call, use mode set by CLI
 | 
						|
    draw_horizontal = context["horizontal"] if draw_horizontal is None else draw_horizontal
 | 
						|
 | 
						|
    # 1. Number of nodes in each layer is provided by dictionary: len(net_layers['input'])
 | 
						|
 | 
						|
    # 2. Define node type to draw for each layer in the network (default ['nn.input', 'nn.hidden', nn.output])
 | 
						|
    node_types = ['nn.input'] + ['nn.hidden'] * context["hidden_layers"] + ['nn.output']
 | 
						|
 | 
						|
    # 3. Use spacing distance to create list of X positions with equal distance apart (default  [0, 1.5, 3.0])
 | 
						|
    # 1.5 * 0 = 0; 1.5 * 1 = 1.5; 1.5 * 2 = 3.0; 1.5 * 3 = 4.5; etc
 | 
						|
    layer_pos = spacing * np.arange(context["hidden_layers"] + 2)
 | 
						|
 | 
						|
    # Create a sequence of Node objects using viznet helper function node_sequence
 | 
						|
    # + Allows defining a NodeBrush for-each node, which is used by the library to style nodes
 | 
						|
    node_sequence = []
 | 
						|
    layer_index = 0
 | 
						|
    for layer in net_layers:
 | 
						|
        # If we are on the hidden layers, iterate through each
 | 
						|
        if layer == 'hidden':
 | 
						|
            for hl in net_layers[layer]:
 | 
						|
                brush = vn.NodeBrush(node_types[layer_index], net_plot)
 | 
						|
                ctr = (layer_pos[layer_index], 0) if draw_horizontal else (0, layer_pos[layer_index])
 | 
						|
                node_sequence.append(vn.node_sequence(
 | 
						|
                    brush, len(hl),
 | 
						|
                    center=ctr, space=(0, 1) if draw_horizontal else (1, 0))
 | 
						|
                )
 | 
						|
                layer_index += 1
 | 
						|
        else:
 | 
						|
            brush = vn.NodeBrush(node_types[layer_index], net_plot)
 | 
						|
            ctr = (layer_pos[layer_index], 0) if draw_horizontal else (0, layer_pos[layer_index])
 | 
						|
            node_sequence.append(vn.node_sequence(
 | 
						|
                brush, len(net_layers[layer]),
 | 
						|
                center=ctr, space=(0, 1) if draw_horizontal else (1, 0))
 | 
						|
            )
 | 
						|
            layer_index += 1
 | 
						|
 | 
						|
    # Define an EdgeBrush that draws arrows between nodes using matplotlib axes
 | 
						|
    edge_brush = vn.EdgeBrush('-->', net_plot)
 | 
						|
    for start, end in zip(node_sequence[:-1], node_sequence[1:]):
 | 
						|
        # Connect each node in `start` layer to each node in `end` layer
 | 
						|
        for start_node in start:
 | 
						|
            for end_node in end:
 | 
						|
                # Apply the EdgeBrush using matplotlib axes and node edge tuple
 | 
						|
                edge_brush >> (start_node, end_node)
 | 
						|
 | 
						|
    plt.show()
 | 
						|
 | 
						|
 | 
						|
################################################################################
 | 
						|
# Main
 | 
						|
################################################################################
 | 
						|
 | 
						|
# ==============================================================================
 | 
						|
 | 
						|
def main(args: List[str]):
 | 
						|
    parser = init_parser()
 | 
						|
    global context
 | 
						|
    context = vars(parser.parse_args(args[1:]))
 | 
						|
    seq_input = None
 | 
						|
    seq_label = None
 | 
						|
    if context['file']:
 | 
						|
        seq_input, seq_label = parse_file()
 | 
						|
 | 
						|
    if seq_input is None or seq_label is None:
 | 
						|
        # You cannot provide input or label sequences via the CLI
 | 
						|
        # If no file was provided with data, use iris dataset as example data
 | 
						|
 | 
						|
        # Use sklearn.dataset to grab example data
 | 
						|
        iris = load_iris()
 | 
						|
        # iris_data = iris.data[:, (0, 1, 2, 3)]
 | 
						|
        iris_data = iris.data[:, (0, 2, 3)]
 | 
						|
        # iris_data = iris.data[:, (2, 3)]
 | 
						|
        iris_label = iris.target
 | 
						|
 | 
						|
        # Or read a CSV manually using pandas
 | 
						|
        # iris = pd.read_csv('/home/kapper/Code/School/CS/AI/Assignment/two/IRIS.csv').to_dict()
 | 
						|
        # iris_data = [[x, y] for x, y in zip(iris['petal_length'].values(), iris['petal_width'].values())]
 | 
						|
        # iris_data = [[x, y, z] for x, y, z in zip(iris['petal_length'].values(),
 | 
						|
        #                                           iris['petal_width'].values(),
 | 
						|
        #                                           iris['sepal_length'].values())]
 | 
						|
        # iris_label = [x for x in iris['species'].values()]
 | 
						|
 | 
						|
        # To change the number of output nodes, we need to adjust the number of labels for classification
 | 
						|
        # iris_data = iris.data[0:99, (0, 2, 3)]
 | 
						|
        # iris_label = [l for l in iris_label if l != 2]
 | 
						|
 | 
						|
        # Convert labels to: 0-> [1, 0, 0]; 1-> [0, 1, 0]; 2->[0, 0, 1]
 | 
						|
        seq_input = iris_data
 | 
						|
        seq_label = []
 | 
						|
        for i, label in enumerate(set(iris_label)):
 | 
						|
            same = [s for s in iris_label if s == label]
 | 
						|
            for l in same:
 | 
						|
                new_label = np.zeros(len(set(iris_label))).tolist()
 | 
						|
                new_label[i] = 1
 | 
						|
                seq_label.append(new_label)
 | 
						|
 | 
						|
    # Assert that the provided learning rate is valid
 | 
						|
    assert(0.0 < context['learn_rate'] <= 1.0)
 | 
						|
 | 
						|
    # This check ensures that the number of inputs match the number of input nodes
 | 
						|
    # + And does the same for output nodes with possible classifications
 | 
						|
    # + But, this removes the ability to grow / shrink input / output layers through CLI
 | 
						|
    if context["inputs"] != len(seq_input[0]):
 | 
						|
        print(f'Warning: Input sequences each contain {len(seq_input[0])} entries '
 | 
						|
              f'but {context["inputs"]} input nodes were requested.\n'
 | 
						|
              f'\tUsing {len(seq_input[0])} input nodes instead of {context["inputs"]}'
 | 
						|
              )
 | 
						|
        context["inputs"] = len(seq_input[0])
 | 
						|
    if context["outputs"] != len(set(map(tuple, seq_label))):
 | 
						|
        print(f'Warning: Output labels contain {len(set(map(tuple, seq_label)))} possible classifications '
 | 
						|
              f'but {context["outputs"]} output were nodes requested.\n'
 | 
						|
              f'\tUsing {len(set(map(tuple, seq_label)))} output nodes instead of {context["outputs"]}'
 | 
						|
              )
 | 
						|
        context["outputs"] = len(set(map(tuple, seq_label)))
 | 
						|
 | 
						|
    # Output the problem settings
 | 
						|
    print(f'Creating a single layer neural network: \n'
 | 
						|
          f'\tTotal input nodes: {context["inputs"]}\n'
 | 
						|
          f'\tNumber of perceptrons in each hidden layer: {context["perceptrons"]}\n'
 | 
						|
          f'\tTotal output nodes: {context["outputs"]}\n'
 | 
						|
          f'\tNumber of hidden layers: {context["hidden_layers"]}\n'
 | 
						|
          f'\tFire threshold: {context["fire_threshold"]}\n'
 | 
						|
          f'\tError threshold: {context["error_threshold"]}\n'
 | 
						|
          f'\tLearn rate: {context["learn_rate"]}\n'
 | 
						|
          f'\tInitial bias: {context["bias"] if context["bias"] is not None else "Random"}\n'
 | 
						|
          f'\tInitial edge weights: {context["weight"] if context["weight"] is not None else "Random"}\n'
 | 
						|
          f'Network visualization settings: \n'
 | 
						|
          f'\tGraph visualization is enabled: {not context["silent"]}\n'
 | 
						|
          f'\tGraph visualization is horizontal: {context["horizontal"]}\n'
 | 
						|
          f'\tGraph visualization is vertical: {not context["horizontal"]}\n'
 | 
						|
          f'\tGraph visualization layer spacing: {context["spacing"]}\n'
 | 
						|
          f'\tTest data input count: {len(seq_input)}'
 | 
						|
          )
 | 
						|
 | 
						|
    # Initialize a dictionary of vectors for mapping to each layer node
 | 
						|
    # + layers['hidden'][0] = [3, 4, 5, 6] --> Hidden layer nodes are at index 3, 4, 5, 6
 | 
						|
    layers = network_layers()
 | 
						|
 | 
						|
    # A dictionary where matrix_dict['input'] maps to edge weight matrix for input_layer->first_hidden_layer
 | 
						|
    # matrix_dict['hidden'] maps to a list of matrices; matrix_dict['hidden'][0] is edge weights for first_hl->second_hl
 | 
						|
    # matrix_dict['output'] maps to edge weight matrix for last_hl->output_layer
 | 
						|
    matrix_dict = get_matrix_dict()
 | 
						|
    # Randomly generate perceptron bias if none was provided through CLI
 | 
						|
    bias_dict = get_bias_dict()
 | 
						|
 | 
						|
    info = train_network(seq_input, seq_label, bias_dict, matrix_dict)
 | 
						|
    # Final console output for overall results
 | 
						|
    info_total_temp = info['correct'] + info['wrong']
 | 
						|
    acc = 100.0 * float(info["correct"] / info_total_temp)
 | 
						|
    print(
 | 
						|
        f'\nCorrect: {info["correct"]} \t Wrong: {info["wrong"]} \t Total: {context["cycles"] * len(seq_input)}'
 | 
						|
        f'\nCycle 1 accuracy: {info["first_acc"]}% \tCycle {context["cycles"]} accuracy: {acc:.4f}%'
 | 
						|
        f'\n{round(acc - info["first_acc"], 4)}% change over {context["cycles"]} cycles '
 | 
						|
        f'\t{round((acc - info["first_acc"]) / context["cycles"], 4)}% average change per cycle'
 | 
						|
    )
 | 
						|
 | 
						|
    # All cycles have finished; Draw the network for a visual example to go with output
 | 
						|
    if not context["silent"]:
 | 
						|
        draw_graph(plt.subplot(), layers)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    sys.exit(main(sys.argv))
 |