#!/usr/bin/python3
import collections # for named tuple && Counter (multisets)
import re # for regex
from enum import IntEnum # for enumerations (enum from C)
import logging # for logging functions
import random # for stochastic chosing of programs
import time # for time.time()
import math # for productionFunction evaluation of math functions
##########################################################################
# auxiliary definitions
[docs]class OperatorType(IntEnum):
"""Enumeration of operator types, ordered by precedence in calculus"""
left_brace = 1
# right brace is never added to the postfix stack
eq = 2 # a == b
ne = 3 # a != b
lt = 4 # a < b
le = 5 # a <= b
gt = 6 # a > b
ge = 7 # a >= b
add = 8
subtract = 9
multiply = 10
divide = 11
power = 12
negate = 13
# trigonometric functions
# Functions that end with 'd' use degrees instead of radians
sin = 14
sind = 15
asin = 16
asind = 17
cos = 18
cosd = 19
acos = 20
acosd = 21
tan = 22
tand = 23
atan = 24
atand = 25
atan2 = 26
atan2d = 27
cot = 28
cotd = 29
acot = 30
acotd = 31
# generic functions
sqrt = 32
abs = 33
log = 34 # base e (natural) logarithm
log10 = 35 # base 10 logarithm
log2 = 36 # base 2 logarithm
# end class OperatorType
dictOperatorTypes = {
'OPERATOR_ADD': OperatorType.add,
'OPERATOR_SUBTRACT': OperatorType.subtract,
'OPERATOR_NEGATE': OperatorType.negate,
'OPERATOR_MULTIPLY': OperatorType.multiply,
'OPERATOR_DIVIDE': OperatorType.divide,
'OPERATOR_POWER': OperatorType.power,
'OPERATOR_EQUAL': OperatorType.eq,
'OPERATOR_NOT_EQUAL': OperatorType.ne,
'OPERATOR_LESS_THAN': OperatorType.lt,
'OPERATOR_LESS_EQUAL': OperatorType.le,
'OPERATOR_GREATER_THAN': OperatorType.gt,
'OPERATOR_GREATER_EQUAL': OperatorType.ge,
'FUNCTION_SIN': OperatorType.sin,
'FUNCTION_SIND': OperatorType.sind,
'FUNCTION_ASIN': OperatorType.asin,
'FUNCTION_ASIND': OperatorType.asind,
'FUNCTION_COS': OperatorType.cos,
'FUNCTION_COSD': OperatorType.cosd,
'FUNCTION_ACOS': OperatorType.acos,
'FUNCTION_ACOSD': OperatorType.acosd,
'FUNCTION_TAN': OperatorType.tan,
'FUNCTION_TAND': OperatorType.tand,
'FUNCTION_ATAN': OperatorType.atan,
'FUNCTION_ATAND': OperatorType.atand,
'FUNCTION_ATAN2': OperatorType.atan2,
'FUNCTION_ATAN2D': OperatorType.atan2d,
'FUNCTION_COT': OperatorType.cot,
'FUNCTION_COTD': OperatorType.cotd,
'FUNCTION_ACOT': OperatorType.acot,
'FUNCTION_ACOTD': OperatorType.acotd,
'FUNCTION_SQRT': OperatorType.sqrt,
'FUNCTION_ABS': OperatorType.abs,
'FUNCTION_LOG': OperatorType.log,
'FUNCTION_LOG10': OperatorType.log10,
'FUNCTION_LOG2': OperatorType.log2,
}
# tuple used to describe parsed data
Token = collections.namedtuple('Token', ['type', 'value', 'line', 'column'])
##########################################################################
# class definitions
[docs]class NumericalPsystem():
"""Numerical P systems class
:ivar array(str) H: array of strings (names of membranes)
:ivar dict membranes: map (dictioanry) between String membrane_name: Membrane object
:ivar MembraneStructure structure: MembraneStructure object (list of structural elements) [1 [2 ]2 ]1
:ivar list(Pobject) variables: list of Pobjects that appear throughtout the P system
:ivar list(Pobject) enzymes: list of enzyme Pobjects that appear throughtout the P system
:ivar file csvFile: file used for Comma Separated Value output
"""
[docs] def __init__(self):
self.H = []
self.membranes = {}
self.structure = None
self.variables = []
self.enzymes = []
self.csvFile = None
[docs] def runSimulationStep(self):
"""Runs 1 simulation step consisting of executing one program (production & dispersion functions) for all membranes that have programs
If a membrane has more than one program, one is chosen randomly for execution"""
# production phase for all membranes
for membraneName in self.H:
membrane = self.membranes[membraneName]
if (len(membrane.programs) < 1):
continue
logging.debug("Production for membrane %s" % membraneName)
# if this membrane does not use enzymes
if (len(membrane.enzymes) == 0):
membrane.chosenProgramNr = 0 if len(membrane.programs) == 1 else random.randint(0, len(membrane.programs) - 1)
else:
membrane.chosenProgramNr = []
for prgNr, prg in enumerate(membrane.programs):
if (prg.isActivatedByEnzyme()):
logging.debug("Program %d activated by enzyme %s" % (prgNr, prg.enzyme.name))
membrane.chosenProgramNr.append(prgNr)
try:
# if this membrane does not use enzymes
if (len(membrane.enzymes) == 0):
# produce a new value
membrane.newValue = membrane.programs[membrane.chosenProgramNr].prodFunction.evaluate()
else:
membrane.newValue = [membrane.programs[prgNr].prodFunction.evaluate() for prgNr in membrane.chosenProgramNr ]
except RuntimeError:
logging.error("Error encountered during production function of membrane %s, program %s" % (membraneName, membrane.chosenProgramNr))
# re-raise the exception to stop the simulator
raise
## reset variable phase
logging.debug("Resetting all variables that are part of production functions to 0")
for variable in self.variables:
if (variable.wasConsumed):
variable.value = 0
variable.wasConsumed = False # if the program it is part of will be executed again, it will be marked as consumed
## reset enzymes phase
logging.debug("Resetting all enzymes that are part of production functions to 0")
for enzyme in self.enzymes:
if (enzyme.wasConsumed):
enzyme.value = 0
enzyme.wasConsumed = False # if the program it is part of will be executed again, it will be marked as consumed
# distribution phase for all membranes
for membraneName in self.H:
membrane = self.membranes[membraneName]
if (len(membrane.programs) < 1):
continue
# if this membrane uses enzymes (that allow multiple program execution)
# and no program is active
elif (type(membrane.chosenProgramNr) == list and len(membrane.chosenProgramNr) == 0):
continue
# if this membrane does not use enzymes
if (type(membrane.chosenProgramNr) == int):
logging.debug("Distribution for membrane %s of unitary value %.02f" % (
membraneName,
membrane.newValue / membrane.programs[membrane.chosenProgramNr].distribFunction.proportionTotal))
# distribute the previously produced value
membrane.programs[membrane.chosenProgramNr].distribFunction.distribute(membrane.newValue)
# if this membrane uses enzymes (that allow multiple program execution)
elif (type(membrane.chosenProgramNr) == list):
for i in range(len(membrane.chosenProgramNr)):
logging.debug("Distribution for membrane %s program %d of unitary value %.02f" % (
membraneName,
membrane.chosenProgramNr[i],
membrane.newValue[i] / membrane.programs[membrane.chosenProgramNr[i]].distribFunction.proportionTotal))
# distribute the previously produced value
membrane.programs[membrane.chosenProgramNr[i]].distribFunction.distribute(membrane.newValue[i])
logging.info("Simulation step finished succesfully")
# end runSimulationStep()
[docs] def simulate(self, stepByStepConfirm = False, printEachSystemState = True, maxSteps = -1, maxTime = -1):
"""Simulates the numericP system until one of the imposed limits is reached
:stepByStepConfirm: True / False - whether or not to wait for confirmation before starting the next simulation step
:printEachSystemState: True / False - whether or not to print the P system state after the execution ofeach simulation step
:maxSteps: The maximmum number of simulation steps to run
:maxTime: The maximum time span that the entire simulation can last"""
currentStep = 1;
# time.time() == time in seconds since the Epoch
startTime = currentTime = time.time();
finalTime = currentTime + maxTime
# write initial system state into csv file
if (self.csvFile != None):
self.csvFile.write("%d, %s, ,%s\n" % (
currentStep,
", ".join([str(var.value) for var in system.variables]),
", ".join([str(enz.value) for enz in system.enzymes])))
while (True):
logging.info("Starting simulation step %d", currentStep)
self.runSimulationStep()
currentTime = time.time()
if (self.csvFile != None):
self.csvFile.write("%d, %s, ,%s\n" % (
currentStep,
", ".join([str(var.value) for var in system.variables]),
", ".join([str(enz.value) for enz in system.enzymes])))
if (printEachSystemState):
self.print()
if (stepByStepConfirm):
input("Press ENTER to continue")
# if there is a maximum time limit set and it was exceded
if ((currentTime >= finalTime) and (maxTime > 0)):
logging.warning("Maximum time limit exceeded; Simulation stopped")
break # stop the simulation
# if there is a maximum step limit set and it was exceded
if ((currentStep >= maxSteps) and (maxSteps > 0)):
logging.warning("Maximum number of simulation steps exceeded; Simulation stopped")
break # stop the simulation
currentStep += 1
#end while loop
logging.info("Simulation finished succesfully after %d steps and %f seconds; End state below:" % (currentStep, currentTime - startTime))
self.print()
# end simulate()
[docs] def print(self, indentSpaces = 2, toString = False, withPrograms = False) :
"""Print the entire Numerical P system with a given indentation level
:indentSpaces: number of spaces used for indentation
:toString: write to a string instead of stdout
:withPrograms: print out the programs from each membrane, along with the membrane variables
:returns: string print of the membrane if toString = True otherwise returns None """
result = ""
result += "num_ps = {\n"
for membraneName in self.H:
membrane = self.membranes[membraneName]
result += " " * indentSpaces + "%s:\n%s" % (membraneName, membrane.print(indentSpaces * 2, toString=True, withPrograms=withPrograms))
result += "}\n"
if (toString):
return result
else:
print(result)
# end print()
[docs] def openCsvFile(self):
"""Opens a .csv (Comma Separated Value) file where the values of all variables and enzymes are printed at each simulation step
The output file is named using the pattern pep_DAY-MONTH-YEAR_HOUR-MINUTE-SECOND.csv"""
self.csvFile = open("pep_%s.csv" % time.strftime("%d-%m-%Y_%H-%M-%S"), mode="w")
self.csvFile.write("PeP csv output. Format = STEP_NR VARIABLE_COLUMNS EMPTY_COLUMN ENZYME_COLUMNS\n")
self.csvFile.write("step, %s, ,%s\n" % (
", ".join([var.name for var in system.variables]),
", ".join([enz.name for enz in system.enzymes])))
# end openCsvFile()
# end class NumericalPsystem
[docs]class MembraneStructure(list):
"""P system membrane structure (list of structural elements)"""
[docs] def __init__(self):
list.__init__(self)
# end class MembraneStructure
[docs]class Membrane():
"""Membrane class, that can contain other membranes or be part of another membrane
:ivar list(Pobject) variables: array of P objects
:ivar list(Program) programs: list of Program objects
:ivar int|list chosenProgramNr: the program nr that was chosen for execution OR array of chosen program numbers when enzymes are used
:ivar double|list(double) newValue: the value that was produced during the previous production phase OR array of values when using enzymes
:ivar list(Pobject) enzymes: array of enzyme P objects
:ivar Membrane parent: parent membrane (Membrane object)
:ivar dict children: map (dictioanry) between String membrane_name: Membrane object
"""
[docs] def __init__(self, parentMembrane = None):
self.variables = []
self.programs = []
self.chosenProgramNr = 0
self.newValue = 0
self.enzymes = []
self.parent = parentMembrane
self.children = {}
[docs] def print(self, indentSpaces = 2, toString = False, withPrograms = False) :
"""Print a membrane with a given indentation level
:indentSpaces: number of spaces used for indentation
:toString: write to a string instead of stdout
:withPrograms: print out the programs from each membrane, along with the membrane variables
:returns: string print of the membrane if toString = True otherwise returns None """
result = " " * indentSpaces
result += "var = {"
for var in self.variables:
result += " %s: %.2f, " % (var.name, var.value)
result += "}\n"
result += " " * indentSpaces + "E = {"
for enz in self.enzymes:
result += " %s: %.2f, " % (enz.name, enz.value)
result += "}\n"
if (withPrograms):
for i, program in enumerate(self.programs):
result += " " * indentSpaces + "pr_%d = { %s }\n" % (i, program.print(indentSpaces = 0, toString=True))
if (toString):
return result
else:
print(result)
# end print()
# end class Membrane
[docs]class Program():
"""Program class
:ivar ProductionFunction prodFunction: ProductionFunction object
:ivar DistributionFunction distribFunction: DistributionFunction object
:ivar Pobject enzyme: Pobject if an enzyme is used for this program
"""
[docs] def __init__(self):
self.prodFunction = None
self.distribFunction = None
self.enzyme = None
[docs] def print(self, indentSpaces = 2, toString = False) :
"""Print a program with a given indentation level
:indentSpaces: number of spaces used for indentation
:toString: write to a string instead of stdout
:returns: string print of the rule if toString = True otherwise returns None """
enzymeCondition = " -> " if type(self.enzyme) != Pobject else " [" +self.enzyme.name + " -> ] "
result = " " * indentSpaces + self.prodFunction.infixExpression + enzymeCondition + self.distribFunction.expression
if (toString):
return result
else:
print(result)
# end print()
[docs] def isActivatedByEnzyme(self):
"""Checks whether the production function activation condition Enzyme > min(PROD_FUNCTION_VARIABLES) is true
:returns: True / False"""
minVarValue = None
for prodItem in self.prodFunction.items:
# only P object variables are considered
if (type(prodItem) == Pobject):
if (minVarValue == None or minVarValue > prodItem.value):
minVarValue = prodItem.value
# no variables are present in the production function
# so the production function is active
if (minVarValue == None):
return True
# the activation condition holds
if (self.enzyme.value > minVarValue):
return True
return False
# end isActivatedByEnzyme()
# end class Program
[docs]class ProductionFunction():
"""Production function class that stores expressions using the postfix (reversed polish) form
:ivar str infixExpression: string representation of the original expression from the input file (written in infix form)
:ivar list postfixStack: stack of operands and operators (auxiliary for postfix form)
:ivar list items: list of operands and operators written in postfix (reverse polish) form
"""
[docs] def __init__(self):
self.infixExpression = ""
self.postfixStack = []
self.items = []
[docs] def evaluate(self):
"""Evaluates the postfix form of a production function and returns the computed value.
During the evaluation, Pobject references are replaced with their value.
:returns: the computed value of the production function, as a numeric value"""
self.postfixStack = []
for item in self.items:
# numeric values get added on the stack
if (type(item) == int or type(item) == float):
self.postfixStack.append(item)
# the value of Pobjects is added to the stack
elif (type(item) == Pobject):
self.postfixStack.append(item.value)
# mark this Pobject as consummed
item.wasConsumed = True
# (unary) operators (single parameter functions) require that one value is popped and the result is added back to the stack
elif (item == OperatorType.sin):
# evaluate the function
self.postfixStack.append( math.sin(self.postfixStack.pop()) )
elif (item == OperatorType.sind):
# evaluate the function
self.postfixStack.append( math.sin(math.radians(self.postfixStack.pop())) )
elif (item == OperatorType.asin):
# evaluate the function
self.postfixStack.append( math.asin(self.postfixStack.pop()) )
elif (item == OperatorType.asind):
# evaluate the function
self.postfixStack.append( math.degrees(math.asin(self.postfixStack.pop())) )
elif (item == OperatorType.cos):
# evaluate the function
self.postfixStack.append( math.cos(self.postfixStack.pop()) )
elif (item == OperatorType.cosd):
# evaluate the function
self.postfixStack.append( math.cos(math.radians(self.postfixStack.pop())) )
elif (item == OperatorType.acos):
# evaluate the function
self.postfixStack.append( math.acos(self.postfixStack.pop()) )
elif (item == OperatorType.acosd):
# evaluate the function
self.postfixStack.append( math.degrees(math.acos(self.postfixStack.pop())) )
elif (item == OperatorType.tan):
# evaluate the function
self.postfixStack.append( math.tan(self.postfixStack.pop()) )
elif (item == OperatorType.tand):
# evaluate the function
self.postfixStack.append( math.tan(math.radians(self.postfixStack.pop())) )
elif (item == OperatorType.atan):
# evaluate the function
self.postfixStack.append( math.atan(self.postfixStack.pop()) )
elif (item == OperatorType.atand):
# evaluate the function
self.postfixStack.append( math.degrees(math.atan(self.postfixStack.pop())) )
elif (item == OperatorType.cot):
# determine cot() from tan()
self.postfixStack.append( 1 / math.tan(self.postfixStack.pop()) )
elif (item == OperatorType.cotd):
# evaluate the function
# determine cot() from tan()
self.postfixStack.append( 1 / math.tan(math.radians(self.postfixStack.pop())) )
elif (item == OperatorType.acot):
# evaluate the function
self.postfixStack.append( math.atan(1 / self.postfixStack.pop()) )
elif (item == OperatorType.acotd):
# evaluate the function
self.postfixStack.append( math.degrees(math.atan(1 / self.postfixStack.pop())) )
elif (item == OperatorType.sqrt):
# evaluate the function
self.postfixStack.append( math.sqrt(self.postfixStack.pop()) )
elif (item == OperatorType.abs):
# evaluate the function
self.postfixStack.append( math.fabs(self.postfixStack.pop()) )
elif (item == OperatorType.log):
# evaluate the function
self.postfixStack.append( math.log(self.postfixStack.pop()) )
elif (item == OperatorType.log10):
# evaluate the function
self.postfixStack.append( math.log10(self.postfixStack.pop()) )
elif (item == OperatorType.log2):
# evaluate the function
self.postfixStack.append( math.log2(self.postfixStack.pop()) )
# order-dependent binary functions require that the operand order be opposite from that of the stack pop operation
elif (item == OperatorType.atan2):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append( math.atan2(op1, op2) )
elif (item == OperatorType.atan2d):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append( math.degrees(math.atan2(op1, op2)) )
# (binary) operators require that two values are popped and the result is added back to the stack
elif (item == OperatorType.add):
# apply the operator
self.postfixStack.append(self.postfixStack.pop() + self.postfixStack.pop())
elif (item == OperatorType.multiply):
# apply the operator
self.postfixStack.append(self.postfixStack.pop() * self.postfixStack.pop())
# order-dependent operators (- / ^ < <= > >=) require that the operand order be opposite from that of the stack pop operation
elif (item == OperatorType.subtract):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append(op1 - op2)
elif (item == OperatorType.negate):
# return the same value but with the sign reversed
self.postfixStack.append( -self.postfixStack.pop() )
elif (item == OperatorType.divide):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append(op1 / op2)
elif (item == OperatorType.power):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append(op1 ** op2)
elif (item == OperatorType.eq):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append(int(op1 == op2))
elif (item == OperatorType.ne):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append(int(op1 != op2))
elif (item == OperatorType.lt):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append(int(op1 < op2))
elif (item == OperatorType.le):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append(int(op1 <= op2))
elif (item == OperatorType.gt):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append(int(op1 > op2))
elif (item == OperatorType.ge):
op2 = self.postfixStack.pop()
op1 = self.postfixStack.pop()
# apply the operator
self.postfixStack.append(int(op1 >= op2))
logging.debug("postfixStack = %s" % self.postfixStack)
if (len(self.postfixStack) > 1):
raise RuntimeError('evaluation error / wrong number of operands or operators')
return self.postfixStack[0]
# end evaluate()
# end class ProductionFunction
[docs]class DistributionFunction(list):
"""Distribution function class (list of distribution rules)
:ivar int proportionTotal: the sum of all proportions
:ivar str expression: string representation of the distribution function
"""
[docs] def __init__(self):
"""Initialize the underling list used to store rules"""
list.__init__(self)
self.proportionTotal = 0
self.expression = ""
[docs] def distribute(self, newValue):
"""Update the variables referenced in the distribution rules according to the specified proportions
:newValue: a value that has to be distributed to the variables based on the proportions specified in the distribution rules"""
for distribRule in self:
distribRule.variable.value += (distribRule.proportion / self.proportionTotal) * newValue
# end distribute()
# end class DistributionFunction
[docs]class DistributionRule():
"""Class for the distribution rules that make up a program, together with the production rules
:ivar int proportion:
:ivar Pobject variable:
"""
[docs] def __init__(self):
self.proportion = 0
self.variable = None
[docs] def print(self, indentSpaces = 2, toString = False) :
"""Print a distribution rule with a given indentation level
:indentSpaces: number of spaces used for indentation
:toString: write to a string instead of stdout
:returns: string print of the rule if toString = True otherwise returns None """
# PROPORTION | VAR_NAME
result = " " * indentSpaces + "%d|%s" % (
self.proportion,
self.variable.name)
if (toString):
return result
else:
print(result)
# end print()
# end class DistributionRule
[docs]class Pobject():
"""Mutable objects that are needed in order to allow all membranes that use the P object to globally modify the object
:ivar str name:
:ivar double value:
:ivar boolean wasConsumed: was consumed in production function
"""
[docs] def __init__(self, name = '', value = 0):
self.name = name
self.value = value
self.wasConsumed = False
# end class Pobject
##########################################################################
# global variables
logLevel = logging.INFO
##########################################################################
# auxiliary functions
[docs]def processPostfixOperator(postfixStack, operator):
"""Compares the provided operator with a postfix stack to determine where to place a new operator (output list or stack)
:postfixStack: stack used for operators
:operator: OperatorType variable
:returns: postfixStack, outputList - outputList == array of OperatorType elements that have been popped from the stack"""
outputList = []
# if the operator has a higher precedence than the symbol at the top of the stack,
if (len(postfixStack) > 0 and operator > postfixStack[-1]):
# operator is pushed onto the stack
postfixStack.append(operator)
# If the precedence of the symbol being scanned is lower than or equal to the precedence of the symbol at the top of the stack,
elif (len(postfixStack) > 0 and operator <= postfixStack[-1]):
# one element of the stack is popped to the output;
outputList.append(postfixStack.pop())
# the scan pointer is not advanced. Instead, the symbol being scanned will be compared with the new top element on the stack.
newPostfixStack, newOutputList = processPostfixOperator(postfixStack, operator)
postfixStack = newPostfixStack
outputList.extend(newOutputList)
# if the stack is empty
elif (len(postfixStack) == 0):
postfixStack.append(operator)
return postfixStack, outputList
# end processPostfixOperator()
[docs]def tokenize(code):
""" generate a token list of input text
adapted from https://docs.python.org/3/library/re.html#writing-a-tokenizer"""
# ORDER MATTERS here: more complex tokens (e.g. >= are checked before >) to avoid incorect parsing
token_specification = [
('FUNCTION_ASIND', r'asind'), # trigonometric function 'asin (x)' with output in degrees
('FUNCTION_ASIN', r'asin'), # trigonometric function 'asin (x)' with output in radians
('FUNCTION_SIND', r'sind'), # trigonometric function 'sin (x)' with input in degrees
('FUNCTION_SIN', r'sin'), # trigonometric function 'sin (x)' with input in radians
('FUNCTION_ACOSD', r'acosd'), # trigonometric function 'acos (x)' with output in degrees
('FUNCTION_ACOS', r'acos'), # trigonometric function 'acos (x)' with output in radians
('FUNCTION_COSD', r'cosd'), # trigonometric function 'cos (x)' with input in degrees
('FUNCTION_COS', r'cos'), # trigonometric function 'cos (x)' with input in radians
('FUNCTION_ATAN2D', r'atan2d'), # trigonometric function 'atan2 (y, x)' with output in degrees
('FUNCTION_ATAN2', r'atan2'), # trigonometric function 'atan2 (y, x)' with output in radians
('FUNCTION_ATAND', r'atand'), # trigonometric function 'atan (x)' with output in degrees
('FUNCTION_ATAN', r'atan'), # trigonometric function 'atan (x)' with output in radians
('FUNCTION_TAND', r'tand'), # trigonometric function 'tan (x)' with input in degrees
('FUNCTION_TAN', r'tan'), # trigonometric function 'tan (x)' with input in radians
('FUNCTION_ACOTD', r'acotd'), # trigonometric function 'acot (x)' with output in degrees
('FUNCTION_ACOT', r'acot'), # trigonometric function 'acot (x)' with output in radians
('FUNCTION_COTD', r'cotd'), # trigonometric function 'cot (x)' with input in degrees
('FUNCTION_COT', r'cot'), # trigonometric function 'cot (x)' with input in radians
('FUNCTION_SQRT', r'sqrt'), # square root function
('FUNCTION_ABS', r'abs'), # absolute value (modulus) function
('FUNCTION_LOG10', r'log10'), # base 10 logarithm function
('FUNCTION_LOG2', r'log2'), # base 2 logarithm function
('FUNCTION_LOG', r'log'), # base e (natural) logarithm function
('NUMBER_FLOAT', r'\d+\.\d+'), # Float number
('NUMBER', r'\d+'), # Integer number
('OPERATOR_NOT_EQUAL', r'\!\='), # Not equal operator (!=)
('OPERATOR_EQUAL', r'\=\='), # Equal operator (==)
('OPERATOR_LESS_EQUAL', r'\<\='), # Less or equal operator (<=)
('OPERATOR_GREATER_EQUAL', r'\>\='),# Greater or equal operator (>=)
('ASSIGN', r'='), # Assignment operator '='
('END', r';'), # Statement terminator ';'
('ID', r'[\w]+'), # Identifiers
('L_BRACE', r'\('), # Left brace '('
('R_BRACE', r'\)'), # Right brace ')'
('L_CURLY_BRACE', r'{'), # Left curly brace '{'
('R_CURLY_BRACE', r'}'), # Right curly brace '}'
('L_BRACKET', r'\['), # Left bracket (straight brace) '['
('R_BRACKET', r'\]'), # Right bracket (straight brace) ']'
('COLUMN', r','), # column ','
('PROD_DISTRIB_SEPARATOR', r'->'), # Program production and distribution component separator sign '->'
('OPERATOR_ADD', r'\+'), # Addition operator (+)
('OPERATOR_SUBTRACT', r'\-'), # Subtraction operator (-)
('OPERATOR_NEGATE', r'\~'), # Subtraction operator (-)
('OPERATOR_MULTIPLY', r'\*'), # Multiplication operator (*)
('OPERATOR_DIVIDE', r'\/'), # Division operator (/)
('OPERATOR_POWER', r'\^'), # Power operator (^)
('OPERATOR_LESS_THAN', r'\<'), # Less than operator (<)
('OPERATOR_GREATER_THAN', r'\>'), # Greater than operator (>)
('DISTRIBUTION_SIGN', r'\|'), # Distribution rule sign '|'
('NEWLINE', r'\n'), # Line endings
('COMMENT', r'#'), # Comment (anything after #, up to the end of the line is discarded)
('SKIP', r'[ \t]+'), # Skip over spaces and tabs
('MISMATCH', r'.'), # Any other character
]
# join all groups into one regex expr; ex:?P<NUMBER>\d+(\.\d*)?) | ...
tok_regex = '|'.join('(?P<%s>%s)' % pair for pair in token_specification)
line_num = 1
line_start = 0
in_comment = False
# iteratively search and return each match (for any of the groups)
for mo in re.finditer(tok_regex, code):
kind = mo.lastgroup # last group name matched
value = mo.group(kind) # the last matched string (for group kind)
#print("kind = %s, value = %s" % (kind, value))
if kind == 'COMMENT':
in_comment = True
elif kind == 'NEWLINE':
line_start = mo.end()
line_num += 1
in_comment = False # reset in_comment state
elif kind == 'SKIP':
pass
elif (kind == 'MISMATCH') and (not in_comment):
raise RuntimeError('%r unexpected on line %d' % (value, line_num))
else:
# skip normal tokens if in comment (cleared at the end of the line)
if in_comment:
continue
column = mo.start() - line_start
yield Token(kind, value, line_num, column)
#end tokenize()
[docs]def print_token_by_line(v):
"""Prints tokens separated by spaces on their original line (with line numbering)"""
line_num = 0;
for token in v:
if (token.line > line_num):
line_num = token.line;
print('\n%d ' % line_num, end='');
print(token.value, end=" ");
# end print_token_by_line()
[docs]def process_tokens(tokens, parent, index):
"""Process tokens recurently and return a P system structure (or a subcomponent of the same type as parent)
:tokens: the list of tokens to be processed
:parent: an object that represents the type of the result
:index: the start index in the list of tokens
:returns: index - the current index in the token list (after finishing this component)
:returns: result - an object that is the result of processing the input parent and tokens """
logging.debug("process_tokens (parent_type = %s, index = %d)" % (type(parent), index))
result = parent # construct the result of specified type
prev_token = tokens[index]
# for processing distribution rules
distribRule = None
while (index < len(tokens)):
token = tokens[index]
logging.debug("token = '%s'" % token.value)
if (type(parent) == NumericalPsystem):
logging.debug("processing as NumericalPsystem")
if (token.type == 'ASSIGN'):
if (prev_token.value == 'H'):
logging.info("building membrane list");
index, result.H = process_tokens(tokens, list(), index + 1);
# if the prev_token is the name of a membrane
elif (prev_token.value in result.H):
logging.info("building Membrane");
index, result.membranes[prev_token.value] = process_tokens(tokens, Membrane(), index + 1);
elif (prev_token.value == 'structure'):
logging.info("building membrane structure");
index, result.structure = process_tokens(tokens, MembraneStructure(), index + 1);
else:
raise RuntimeError("Unexpected token '%s' on line %d" % (prev_token.value, prev_token.line))
# end if parent == NumericalPsystem
elif (type(parent) == MembraneStructure):
logging.debug("processing as MembraneStructure")
if (token.type in ('ID', 'NUMBER', 'L_BRACKET', 'R_BRACKET')):
parent.append(token)
elif (token.type == 'END'):
logging.debug("finished the MembraneStructure with result = %s" % result)
return index, result;
else:
raise RuntimeError("Unexpected token '%s' on line %d" % (token.value, token.line))
# end if parent == MembraneStructure
elif (type(parent) == Membrane):
logging.debug("processing as Membrane")
if (token.type == 'ASSIGN'):
if (prev_token.value == 'var'):
logging.info("building variable list");
index, variables = process_tokens(tokens, list(), index + 1);
for var in variables:
result.variables.append(Pobject(name = var))
elif (prev_token.value == 'E'):
logging.info("building enzyme list");
index, enzymes = process_tokens(tokens, list(), index + 1);
for enz in enzymes:
result.enzymes.append(Pobject(name = enz))
elif (prev_token.value == 'pr'):
logging.info("building Program");
index, program = process_tokens(tokens, Program(), index + 1);
result.programs.append(program)
elif (prev_token.value == 'var0'):
logging.info("building var0 list");
index, variables = process_tokens(tokens, list(), index + 1);
for i, var in enumerate(variables):
result.variables[i].value = var
elif (prev_token.value == 'E0'):
logging.info("building E0 list");
index, enzymes = process_tokens(tokens, list(), index + 1);
for i, enz in enumerate(enzymes):
result.enzymes[i].value = enz
else:
raise RuntimeError("Unexpected token '%s' on line %d" % (token.value, token.line))
# end if parent == Membrane
elif (type(parent) == Program):
logging.debug("processing as Program")
if (token.type == 'L_CURLY_BRACE'):
logging.info("building production function");
index, result.prodFunction = process_tokens(tokens, ProductionFunction(), index + 1);
elif (token.type == 'L_BRACKET'):
logging.info("storing enzyme required by program");
if (tokens[index + 1].type == 'ID'):
# storing enzyme as string for now, will be referenced later to a Pobject
result.enzyme = tokens[index + 1].value
index += 2
else:
raise RuntimeError("Unexpected token '%s' on line %d" % (token.value, token.line))
# build a distribution rule if the PROD_DISTRIB_SEPARATOR '|' is reached for a non-enzymatic program or R_BRACKET ']' is reached for an enzymatic program
elif ((token.type == 'PROD_DISTRIB_SEPARATOR' and result.enzyme == None)
or (token.type == 'R_BRACKET' and type(result.enzyme) == str)):
logging.info("building distribution rule");
index, result.distribFunction = process_tokens(tokens, DistributionFunction(), index + 1);
elif (token.type == 'R_CURLY_BRACE'):
logging.debug("finished this Program with result = %s" % result)
return index, result;
elif (token.type == 'END'):
logging.debug("finished this block with result = %s" % result)
return index, result;
else:
raise RuntimeError("Unexpected token '%s' on line %d" % (token.value, token.line))
# end if parent == Program
elif (type(parent) == ProductionFunction):
logging.debug("processing as ProductionFunction")
# construct a string representation of the expression (in infix form)
if (token.type != 'PROD_DISTRIB_SEPARATOR' and token.type != 'L_BRACKET'):
result.infixExpression += " " + token.value
if (token.type == 'NUMBER'):
logging.debug("processing integer number")
result.items.append(int(token.value))
elif (token.type == 'NUMBER_FLOAT'):
logging.debug("processing float number")
result.items.append(float(token.value))
elif (token.type == 'ID'):
logging.debug("processing variable")
result.items.append(token.value) # store as string for now, reference to real P object later
elif (token.type == 'L_BRACE'):
logging.debug("processing operator %s" % token.value)
# add the current operator to the postfix transformation
result.postfixStack.append(OperatorType.left_brace)
elif (token.type == 'R_BRACE'):
logging.debug("processing operator %s" % token.value)
# pop all elements in the stack up until the left brace and add them to the postfix form
while (result.postfixStack[-1] != OperatorType.left_brace):
op = result.postfixStack.pop()
# append all popped operators
result.items.append(op)
# now that all elements that were above the left_brace were removed, we pop the left_brace (now the top-most element) from the stack
result.postfixStack.pop()
elif (token.type in dictOperatorTypes.keys()):
logging.debug("processing operator %s" % token.value)
# current operator as OperatorType enum value
currentOperator = dictOperatorTypes[token.type]
result.postfixStack, newOutputList = processPostfixOperator(result.postfixStack, currentOperator)
result.items.extend(newOutputList)
elif (token.type == 'PROD_DISTRIB_SEPARATOR' or token.type == 'L_BRACKET'):
logging.debug("production function end; emptying stack")
# pop all elements in the stack
while (len(result.postfixStack) > 0):
result.items.append(result.postfixStack.pop())
logging.debug("finished the production function with result = %s" % result.items)
result.infixExpression = result.infixExpression[1:] # strip the first character (leading space)
# the Program should also see PROD_DISTRIB_SEPARATOR or L_BRACKET in order to trigger the build of a distribution function or to store an enzyme for this program
return index-1, result;
elif (token.type == 'L_CURLY_BRACE'):
logging.debug("skipped left curly brace")
# continue to next token
prev_token = token;
index += 1
continue
else:
raise RuntimeError("Unexpected token '%s' on line %d" % (token.value, token.line))
# end if parent == ProductionFunction
elif (type(parent) == DistributionFunction):
logging.debug("processing as DistributionFunction")
if (token.type == 'NUMBER'):
# construct a new distribution rule
distribRule = DistributionRule()
distribRule.proportion = int(token.value)
result.expression += " " + token.value
elif (token.type == 'ID' and prev_token.type == "DISTRIBUTION_SIGN"):
# finalize the distribution rule
distribRule.variable = token.value # store as string for now, reference later
result.proportionTotal += distribRule.proportion
result.append(distribRule) # store the new distribution rule
result.expression += token.value
# is used to separate rules, not needed
elif (token.type == 'OPERATOR_ADD'):
result.expression += " " + token.value
logging.debug("skipped '+'")
# continue to next token
prev_token = token;
index += 1
continue
# is used to separate the proportion from the variable, not needed
elif (token.type == "DISTRIBUTION_SIGN"):
result.expression += token.value
logging.debug("skipped '|'")
# continue to next token
prev_token = token;
index += 1
continue
elif (token.type == 'R_CURLY_BRACE'):
logging.debug("finished this DistributionFunction with result = %s" % result)
result.expression = result.expression[1:] # strip the first character (leading space)
return index, result;
else:
raise RuntimeError("Unexpected token '%s' on line %d" % (token.value, token.line))
# end if parent == DistributionFunction
elif (type(parent) == DistributionRule):
logging.debug("processing as DistributionRule")
if (token.type == 'R_CURLY_BRACE'):
logging.debug("finished this DistributionRule with result = %s" % result)
return index, result;
# end if parent == DistributionRule
elif (type(parent) == list):
logging.debug("processing as List")
if (token.type == 'ID'):
result.append(token.value);
elif (token.type == 'NUMBER'):
if (prev_token.type == 'OPERATOR_SUBTRACT'):
result.append(-int(token.value));
else:
result.append(int(token.value));
elif (token.type == 'NUMBER_FLOAT'):
if (prev_token.type == 'OPERATOR_SUBTRACT'):
result.append(-float(token.value));
else:
result.append(float(token.value));
elif (type(parent) == str):
logging.debug("processing as Str")
if (token.type == 'ID'):
result = token.value;
elif (type(parent) == int):
logging.debug("processing as Int")
if (token.type == 'NUMBER'):
result = int(token.value);
else:
logging.debug("processing as GENERAL")
# process the token generally
if (token.type == 'ASSIGN'):
logging.info("building NumericalPsystem")
index, result = process_tokens(tokens, NumericalPsystem(), index + 1);
if (token.type == 'END'):
logging.debug("finished this block with result = %s" % result)
return index, result;
prev_token = token;
index += 1
return index, result
#end process_tokens
# end readinputfile()
##########################################################################
# MAIN
if (__name__ == "__main__"):
import sys # for argv
if ('--debug' in sys.argv or '-v' in sys.argv):
logLevel = logging.DEBUG
elif ('--error' in sys.argv or '-v0' in sys.argv):
logLevel = logging.ERROR
try:
import colorlog # colors log output
formatter = colorlog.ColoredFormatter(
"%(log_color)s%(levelname)-8s %(message)s %(reset)s",
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
},
secondary_log_colors={},
style='%'
)
colorlog.basicConfig(stream = sys.stdout, level = logLevel)
stream = colorlog.root.handlers[0]
stream.setFormatter(formatter);
# colorlog not available
except ImportError:
logging.basicConfig(format='%(levelname)s:%(message)s', level = logLevel)
if (len(sys.argv) < 2):
logging.error("Expected input file path as parameter")
print("Usage: pep.py PEP_INPUT_FILE [options]")
print(" [options] can be:")
print(" * -n NR: stop the simulation after NR execution steps")
print(" * --step: step-by-step execution")
print(" * --csv: write a Comma Separated Values (CSV) file that contains the values of all Pobjects at each simulation step")
print(" * -v | --debug: increase verbosity")
print(" * -v0 | --error: decrease verbosity")
exit(1)
# step by step simulation
step = False
if ('--step' in sys.argv):
step = True
# nr of simulation steps
nrSteps = -2 # -2 == undefined, -1 == unlimited
if ('-n' in sys.argv):
try:
nrSteps = int(sys.argv[sys.argv.index('-n') + 1])
except (ValueError, IndexError):
logging.error("Expected a number (of simulation steps) after the '-n' parameter")
finally:
# if nrSteps still is undefined (-2)
if (nrSteps == -2):
exit(1)
system = readInputFile(sys.argv[1])
if ("--csv" in sys.argv):
system.openCsvFile()
if (logLevel <= logging.WARNING):
# print the structure of the P system
system.print(indentSpaces=4, withPrograms = True)
system.simulate(stepByStepConfirm = step, maxSteps = nrSteps)
if (system.csvFile != None):
logging.info("Wrote csv output file %s" % system.csvFile.name)
system.csvFile.close()
print("\n\n");