Source code for src.Libs.FileIO.FileDataClass

#!/usr/bin/env python3
"""
This file is part of the PAFFrontendSim.

The PAFFrontendSim is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License v3 as published by the Free Software Foundation.

The PAFFrontendSim is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with the PAFFrontendSim. If not, see https://www.gnu.org/licenses/.
"""
#############################################################################
# import libs
#############################################################################
import numpy                          as     np
import matplotlib.pyplot              as     plt
from   copy                           import copy, deepcopy

import LogClass
from FileIO.constants import _COMMENT, _HEADERINDICATOR, _KEYINDICATOR, _DefaulHeader


#############################################################################
# constants
#############################################################################
_CLASSNAME       = "FileData"

#############################################################################
# classes
#############################################################################       
[docs] class FileNameData : """ Class to parse file-names of Amplifier data """ def __init__ ( self, fileName = "", log = None ) : """ get components from filename """ if log == None: self.log = LogClass.LogClass(_CLASSNAME) else: self.log = deepcopy(log) # Logging class used for all in and output messages self.log.addLabel(self) # split file name in path, base-name and file extension fname = fileName.split('/') self.path = '/'.join(fname[0:-1]) + "/" # extract file extension if available name = fname[-1].split('.') if len(name) > 1 : self.ext = name[-1] self.baseName = '.'.join(name[0:-1]) else : self.ext = "" self.baseName = name[0] # extract Redmine version number if available v = self.baseName.split('_DMSFVersion_') if len(v) > 1 : self.Version = v[-1] self.baseName = v[0] try: self.Version = float(v[-1]) except: self.Version = -1. else: self.Version = -1
[docs] def printOut ( self, line, message_type, noSpace = True, lineFeed = True ) : """ Wrapper for the printOut method of the actual log class """ self.log.printOut(line, message_type, noSpace, lineFeed)
[docs] class FileData : """ This module (and specifically this class) provides a simple interface for other modules to read and write data to files. The header information and file structure only need to be passed once. Subsequent read and write operations can be performed by using the methods readData and writeData. """ def __init__ ( self, log = None, headerProfile = _DefaulHeader ): """ Initialize class. """ if log == None: self.log = LogClass.LogClass(_CLASSNAME) else: self.log = deepcopy(log) # Logging class used for all in and output messages self.log.addLabel(self) self.Name = "" # Name of the profile self.Comment = _COMMENT # comment sign self.KeyIndicator = _KEYINDICATOR # sign indicating the keyword-end self.HeaderLine = _HEADERINDICATOR # line to start / stop the header self.Empty = 1 # number of empty lines after header line self.upperCase = True self.addUnknown = False self.delmitter = None self.accuracy = None self.length = None self.scientific = False self.writeFunction = None self.readFunction = None self.headerProfile = headerProfile # parse header profile newDict = {} for k in headerProfile : # parse special keywords for configuration if k == "_NAME" : self.Name = headerProfile["_NAME"] continue if k == "_COMMENT" : self.Comment = headerProfile["_COMMENT"] continue if k == "_KEYINDICATOR" : self.KeyIndicator = headerProfile["_KEYINDICATOR"] continue if k == "_HEADERINDICATOR" : self.HeaderLine = headerProfile["_HEADERINDICATOR"] continue if k == "_EMPTY" : self.Empty = headerProfile["_EMPTY"] continue if k == "_UPPERCASE" : self.upperCase = headerProfile["_UPPERCASE"] continue if k == "_ADDUNKNOWN" : self.addUnknown = headerProfile["_ADDUNKNOWN"] continue if k == "_DELMITTER" : self.delmitter = headerProfile["_DELMITTER"] continue if k == "_ACCURACY" : self.accuracy = headerProfile["_ACCURACY"] continue if k == "_LENGTH" : self.length = headerProfile["_LENGTH"] continue if k == "_SCIENTIFIC" : self.scientific = True continue if k == "_ReadFunction" : self.readFunction = headerProfile["_ReadFunction"] continue if k == "_WriteFunction" : self.writeFunction = headerProfile["_WriteFunction"] continue if ( k[0] != '_' ) or ( k.upper().find("COMMENT") > -1 ): if self.upperCase : newDict[k.upper() + self.KeyIndicator] = headerProfile[k] else : newDict[k + self.KeyIndicator] = headerProfile[k] self.printOut("Successfully implemented header profile '%s'" % self.Name, self.log.FLOW_TYPE) self.profile = newDict # directory with the header profile
[docs] def printOut ( self, line, message_type, noSpace = True, lineFeed = True ) : """ Wrapper for the printOut method of the actual log class """ self.log.printOut(line, message_type, noSpace, lineFeed)
[docs] def createEmptyResult ( self ) : """ creates an empty result dictionary with all header entries set to default values. Returns: resultDict (dict): empty result dictionary with all header entries set to default values """ resultDict = {} for p in self.profile: if p.upper().find("_COMMENT") > -1 : # filter comment lines continue parseList = self.profile[p] answerList = [] for idChar in parseList[1] : if ( idChar == "U" ) or ( idChar == "S" ) : answerList.append( "UNDEFINED" ) break if ( idChar == "F" ) : answerList.append( 0.0 ) break if ( idChar == "D" ) : answerList.append( 0 ) if ( idChar == "s" ) : answerList.append( "UNDEFINED" ) if ( idChar == "f" ) : answerList.append( 0.0 ) if ( idChar == "d" ) : answerList.append( 0 ) resultDict[p] = answerList return resultDict
[docs] def parseLine ( self, line ): """ Parsing a given string according to the header. Parameters: line (string): The line to parse Returns: tuple (tuple): boolean flag indicating success or failure, Keyword-string ("ERROR:" in case of an error, "WARNING:" in case of a warning), list of parameters (list with one string holding the error message in case of an error) """ answerList = [] # list with all parsed parameters line = line.strip() # remove CR / NL / EOF from line end if ( len(line) < 1 ) or ( line[0] != self.Comment ) : # check for empty line or data contend line return True, "", [] line = line[1:] # remove leading comment columns = line.split() # split line if len(columns) < 2 : # check for empty comment line return True, "", [] # check for valid key-word if self.upperCase: KeyWord = columns[0].upper() else : KeyWord = columns[0] self.printOut("Searching for key-word >%s<" % columns[0] , self.log.DEBUG_TYPE) if KeyWord in self.profile : # valid key-word detected self.printOut(" detected in profile", self.log.DEBUG_TYPE, False) parseList = self.profile[ KeyWord ] # check number of available parameters if len( columns )-1 < len( parseList[1] ): return False, "ERROR:", ["Not enough parameter"] # loop all requested parameters cCount = 1 for p in parseList[1]: if p == 'U' : columns = line.split(maxsplit = 1) UString = columns[1].split(self.Comment)[0].strip() return True, KeyWord, [ UString ] if p == 'S' : while ( cCount < len(columns) ) and ( len(columns[cCount]) > 1 ) and ( columns[cCount][0] != self.Comment ) : answerList.append(columns[cCount]) cCount += 1 return True, KeyWord, answerList if p == 's' : answerList.append(columns[cCount]) cCount += 1 if p == 'F' : while ( cCount < len(columns) ) and ( len(columns[cCount]) > 1 ) and ( columns[cCount][0] != self.Comment ) : try: f = float(columns[cCount]) answerList.append(f) except: return False, "ERROR:", ["Parameter %d is not a float value!" % ( cCount - 1 ) ] cCount += 1 return True, KeyWord, answerList if p == 'f' : try: f = float(columns[cCount]) answerList.append(f) except: return False, "ERROR:", ["Parameter %d is not a float value!" % ( cCount - 1 ) ] cCount += 1 if p == 'D' : while ( cCount < len(columns) ) and ( len(columns[cCount]) > 1 ) and ( columns[cCount][0] != self.Comment ) : try: d = int(columns[cCount]) answerList.append(f) except: return False, "ERROR:", ["Parameter %d is not an integer value!" % ( cCount - 1 ) ] cCount += 1 return True, KeyWord, answerList if p == 'd' : try: d = int(columns[cCount]) answerList.append(d) except: return False, "ERROR:", ["Parameter %d is not an integer value!" % ( cCount - 1 ) ] cCount += 1 return True, KeyWord, answerList # check for unknown keywords if KeyWord[-1] == ':' : if self.addUnknown : self.printOut(" adding unknown keyword to result", self.log.DEBUG_TYPE, False) columns = line.split(maxsplit = 1) UString = columns[1].split(self.Comment)[0] return True, KeyWord, [ UString ] else: return False, "WARNING:", ["Unknown keyword >%s< detected" % columns[0]] # seems to be a comment line return True, "", []
[docs] def parseHeader ( self, filename ): """ Getting all header data from file with filename 'filename'. Parameters: filename (string): path of file Returns: dictionary (dict): dictionary of results """ self.printOut("Parsing header of file '%s'" % filename, self.log.FLOW_TYPE) with open(filename, 'r', encoding='latin-1' ) as fh : # open file for reading resultDict = self.createEmptyResult() for line in fh : # and loop all lines in the file flag, key, resultList = self.parseLine( line ) if flag : if len(resultList) > 0 : resultDict[key] = resultList else : if key == "ERROR:" : self.printOut(" " + key + " " + resultList[0], self.log.ERROR_TYPE, False) else : self.printOut(" " + key + " " + resultList[0], self.log.WARNING_TYPE, False) fh.close() return self._translateDict(resultDict)
def _translateDict ( self, inputDict, Direction = True) : """ Create an output dict using the key-word translations in the profile. Parameters: inputDict (dict): dictionary to translate Direction (boolean): flag for forwards translation (True = forwards, False = backwards) Returns: outputDict (dict): translated dictionary """ self.printOut("Translating dictionary", self.log.FLOW_TYPE, False) self.printOut(" using profile %s" % self.Name, self.log.DEBUG_TYPE, False) profile = {} for key in self.profile: if key[0] == "_" : continue profile[key] = deepcopy( self.profile[key] ) # reverse translation or not? if not Direction : self.printOut(" translating backwards to profile", self.log.DEBUG_TYPE, False) newProfile = {} # establish new profile dictionary for key in profile: # loop all keys in actual profile newKey = profile[key][2] # extract translation key if ( newKey == "" ) : # if translation key is empty, newProfile[key] = profile[key] # use actual profile entry else : # else parts = newKey.split(self.KeyIndicator) # split new-key in key-section and value section newKey = parts[0] + self.KeyIndicator # re-compose transation key # allow for multiple keys of the same keyword to compose answer d = 0 # set key-counter while ( newKey + "%d" %d ) in newProfile: d+= 1 # check up to which counter the key is already in the new profile and establish therewith an unused key newKey = newKey + "%d" %d newArrange = [] # empty value translation string if len( parts ) > 1 : # check if value translation is given and reverse it if so for i, n in enumerate(parts[1]) : # loop all parts of the value-translation string if n == '-' : continue n = int(n) # extract number belonging to the actual position while len(newArrange) <= n : # append positions in new value translation string newArrange.append("-") i = ("%d" %i)[0] # and finaly set value position number newArrange[n] = i newProfile[newKey] = profile[key] # set new profile entry newProfile[newKey][2] = key for a in newArrange : newProfile[newKey][2] += a profile = newProfile # set translation profile to reverted profile # translate dictionary using the established profile # establish key-translation dictionary keyDict = {} # establish a key dictionary with all entries of the profile belonging to one key-word for k in profile : # loop all keys in the profile key = k.split(self.KeyIndicator)[0] + self.KeyIndicator # compare only part before key-identifier (here ':' ) if not key in keyDict : # establish key if not already present keyDict[key] = [] keyDict[key].append(k) # and add actual key + expansion number to list of belonging keys # translate dictionary outputDict = {} # create an empty output dictionary for k in inputDict : # loop all entries of the input dictionary keyValues = deepcopy( inputDict[k] ) # get all values belonging to the key translatedKey = k # pre-set translated key (in case translation is not necessary) newKeyValues = keyValues # and translated key-values #self.printOut("checking Key %s" % k, self.log.DEBUG_TYPE, False) if k in keyDict: # if key is in the profile : for key in keyDict[k] : # loop all keys in the profile belonging to this dictionary entry # translate Key parts = [] #self.printOut(" checking sub-Key %s" % key, self.log.DEBUG_TYPE, False) newKey = profile[key][2] # extract translated key if newKey != "" : # check if translation is required parts = newKey.split(self.KeyIndicator) # if so extract new key and value assign string if len(parts) > 0 : translatedKey = parts[0] + self.KeyIndicator if not ( translatedKey in outputDict ) : # add translated key to output dictionary if not already present outputDict[translatedKey] = [] if ( len( parts ) > 1 ) and ( len( parts[1] ) > 0 ) : # check if value re-aranging is required newKeyValues = outputDict[translatedKey] # get existing values for i, n in enumerate( parts[1] ): # loop all positions in value assign string try: if n != '-' : # check if position is unused n = int(n) # if not, extract integer from character while n >= len(newKeyValues): # make sure, values exist up to the indicated number newKeyValues.append("") newKeyValues[n] = keyValues[i] # set value correspondingto assign string position and value number except: self.printOut("Can not sort result list correctly-- error in profile??", self.log.ERROR_TYPE, False) self.printOut(" found Key %s" % translatedKey, self.log.DEBUG_TYPE, False) outputDict[translatedKey] = newKeyValues # set output dictionary entry return outputDict def _printValue ( self, form, value, length = 10, acc = 4 ) : """ internal method: print value to string """ if ( ( form.upper() == "D" ) ) and isinstance( value, ( float, int ) ): s = "%d" % value elif ( ( form.upper() == "F" ) or ( form.upper() == "" ) ) and isinstance( value, ( float, int ) ): f = "%." + "%d" % acc + "f" s = f % value #s +="0"*(acc-len(s.split('.')[1])) elif ( form.upper() == "E" ) and isinstance( value, ( float, int ) ): f = "%." + "%d" % acc + "e" s = f % value #s +="0"*(acc-len(s.split('.')[1])) elif ( ( form.upper() == "S" ) or ( form.upper() == "U" ) or ( form.upper() == "" ) ) and isinstance( value, str ): s = value else: s = "" self.printOut(" no writing rutine for this data type", self.log.WARNING_TYPE, False) length = int(length) if (abs(length) < len(s)) and (length != 0): if form.upper() == "S" : return s[0 : length] if form.upper() == "F": if len(s.split(".")[0]) < length : return s[0 : length] else : if ( form.upper() == "S" ) or ( form.upper() == "U" ) : s = s + " "*(length - len(s)) else: s = " "*(length - len(s)) + s return s
[docs] def writeHeader ( self, filename, resultDict ): """ Writing a header with all the contend given in the resultDictionary Parameters: filename (string): name of the file to write resultDict (dictionary): dictionary with keywords and values to write """ def writeLine(key, values, maxKey, form = "U", comment = "") : """ create a formatted output line string """ outString = self.Comment + " " + key # write key outString += " " * ( maxKey - len( key ) ) # write spaces for i in range(0, len( values ) ) : # loop list of values/items assosiated with key if i < len(form) : if (form[i] == "U") or (form[i] == "S") or (form[i] == "F"): f = deepcopy(form[i]) while i < len( values ): outString += self._printValue( f, values[i] ) + " " i+= 1 else : outString += self._printValue( form[i], values[i] ) + " " # write header entry comment if available if ( isinstance( comment, str ) ) and ( comment != "" ) : if len( outString ) < 80 : outString += " " * ( 80 - len( outString ) ) outString += self.Comment + " " + comment return outString + "\n" self.printOut("Writing header to file '%s'" % filename, self.log.FLOW_TYPE) ResultDict = self._translateDict(resultDict, Direction = False) # get maximum length of the key-entry: maxKey = 0 for key in ResultDict : if len(key) > maxKey : maxKey = len(key) maxKey += 1 with open(filename, 'w') as fh : # open file for reading fh.write( self.Comment + self.HeaderLine + "\n" ) # write line for header start for i in range(0, self.Empty): # write empty comment lines fh.write( self.Comment + "\n") fh.write( self.Comment + " " + self.Name + "\n" ) written = {} # dictionary of written keys for key in self.profile : if key.upper().find("_COMMENT") > -1 : # write comment line fh.write( self.Comment + "\n" ) fh.write( self.Comment + " " + self.profile[key] + "\n" ) continue if key in ResultDict : written[key] = True form = self.profile[key][1] # get key format string comment = "" # get comment if len( self.profile[key] ) > 3 : comment = self.profile[key][3] fh.write( writeLine( key, ResultDict[key], maxKey, form, comment ) ) # write line if self.addUnknown: # write unknown keywords if required fh.write( self.Comment + "\n" ) fh.write( self.Comment + " Other keywords:\n" ) for key in ResultDict : # loop dictionary if not ( key in written ): fh.write( writeLine( key, ResultDict[key], maxKey ) ) # write line #for key in ResultDict : # loop dictionary #outString = "" #if ResultDict[key] == None : #continue #if not (key in self.profile ): # check if key is in the profile #if self.addUnknown: #form = "U" #else : #self.printOut(" Key >%s< is unknown to the actual profile!" % key, self.log.WARNING_TYPE, False) #continue #else : #form = self.profile[key][1] # get key format string #outString += self.Comment + " " + key # write key #outString += " " * ( maxKey - len( key ) ) # write spaces #for i in range(0, len( ResultDict[key] ) ) : # loop list of values/items assosiated with key #if i < len(form) : #if (form[i] == "U") or (form[i] == "S") or (form[i] == "F"): #f = deepcopy(form[i]) #while i < len(ResultDict[key]): #outString += self._printValue( f, ResultDict[key][i] ) + " " #i+= 1 #else : #outString += self._printValue( form[i], ResultDict[key][i] ) + " " ## write header entry comment if available #if ( len( self.profile[key] ) > 3 ) and ( isinstance( self.profile[key][3], str ) ) and ( self.profile[key][3] != "" ) : #if len( outString ) < 80 : #outString += " " * ( 80 - len( outString ) ) #outString += self.Comment + " " + self.profile[key][3] #fh.write(outString + "\n") # end line for i in range(0, self.Empty): # write empty comment lines fh.write( self.Comment + "\n") fh.write(self.Comment + self.HeaderLine + "\n") # write line for header end for i in range(0, self.Empty): # write empty comment lines fh.write( self.Comment + "\n") fh.close()
[docs] def writeData ( self, filename, data, header = None, delmitter = " ", accuracy = 4, length = 10, dataDescription = "" ) : """ writes data array to disk. Parameters: filename (string): path of the file to write to data (2D-array): array containing the data header (dict): header data; if None, no header is written delmitter (string): characters between the entries accuracy (int): number of digits for float numbers length (int): field length (minimal) dataDescription (string): data line description string, if empty, nothing is written """ # overwrite given values with profile values if existant: if self.delmitter != None : delmitter = self.delmitter if self.accuracy != None : accuracy = self.accuracy if self.length != None : length = self.length writeFlag = "a" try: if header == None: writeFlag = "w" except: pass if writeFlag == "a" : self.writeHeader(filename, header) self.printOut("Write data to file '%s'" % filename, self.log.FLOW_TYPE) with open(filename, writeFlag) as fh : if dataDescription != "" : fh.write( self.Comment + "\n" ) fh.write( self.Comment + dataDescription + "\n" ) for lData in data : line = False if self.writeFunction != None : # translate data-point if translation function is given lineData = self.writeFunction( lData ) else: lineData = lData for item in lineData: if line : fh.write(delmitter) if isinstance (item, int) : fh.write(self._printValue("D", item, length, accuracy) ) elif self.scientific : fh.write(self._printValue("E", item, length, accuracy) ) else: fh.write(self._printValue("", item, length, accuracy) ) line = True fh.write("\n") fh.close()
[docs] def readData ( self, filename, delmitter = None, noColumns = 0, tryFloat = True ) : """ Reads data array from disk. Parameters: filename (string): path of the file to write to delmitter (string): characters between the entries NoColumns (int): maximal number of columns to consider tryFloat (boolean): indicating if a pure string array is returned (False) or if the entries are tried to be interpreted as floats (True) Returns: tuple (tuple): headerDict (dict with header data), data (array with data) """ # NOTE: this might need to be changed. is everything correct with "delmitter"? if ( type( self.delmitter ) != type( None ) ) and ( type( delmitter ) == type( None ) ): delmitter = self.delmitter headerDict = self.parseHeader ( filename ) self.printOut("Reading data from file '%s'" % filename, self.log.FLOW_TYPE) data = [] with open(filename, "r", encoding='latin-1') as fh : ##### errors = 'replace' for line in fh : # and loop all lines in the file dLine = [] line = line.strip() # remove CR / NL / EOF from line end if ( len(line) < 1 ) or ( line[0] == self.Comment ) : # check for empty line or data contend line continue if delmitter != " " : columns = line.split(delmitter) else : columns = line.split() for c in columns: entry = c.strip() if tryFloat : try : entry = float(entry) dLine.append(entry) except : pass else: dLine.append(entry) if len( dLine ) < noColumns : # skip incomplete lines continue if self.readFunction != None : # translate line entries if translation function is given data.append( self.readFunction( dLine ) ) else: data.append( dLine ) fh.close() return headerDict, data