Source code for src.Libs.FileIO.FileDataClass
#!/usr/bin/env python3
"""
This file is part of the PAFFrontendSim.
The PAFFrontendSim is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License v3 as published by the Free Software Foundation.
The PAFFrontendSim is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the PAFFrontendSim. If not, see https://www.gnu.org/licenses/.
"""
#############################################################################
# import libs
#############################################################################
import numpy as np
import matplotlib.pyplot as plt
from copy import copy, deepcopy
import LogClass
from FileIO.constants import _COMMENT, _HEADERINDICATOR, _KEYINDICATOR, _DefaulHeader
#############################################################################
# constants
#############################################################################
_CLASSNAME = "FileData"
#############################################################################
# classes
#############################################################################
[docs]
class FileNameData :
"""
Class to parse file-names of Amplifier data
"""
def __init__ ( self, fileName = "", log = None ) :
"""
get components from filename
"""
if log == None:
self.log = LogClass.LogClass(_CLASSNAME)
else:
self.log = deepcopy(log) # Logging class used for all in and output messages
self.log.addLabel(self)
# split file name in path, base-name and file extension
fname = fileName.split('/')
self.path = '/'.join(fname[0:-1]) + "/"
# extract file extension if available
name = fname[-1].split('.')
if len(name) > 1 :
self.ext = name[-1]
self.baseName = '.'.join(name[0:-1])
else :
self.ext = ""
self.baseName = name[0]
# extract Redmine version number if available
v = self.baseName.split('_DMSFVersion_')
if len(v) > 1 :
self.Version = v[-1]
self.baseName = v[0]
try:
self.Version = float(v[-1])
except:
self.Version = -1.
else:
self.Version = -1
[docs]
def printOut ( self, line, message_type, noSpace = True, lineFeed = True ) :
"""
Wrapper for the printOut method of the actual log class
"""
self.log.printOut(line, message_type, noSpace, lineFeed)
[docs]
class FileData :
"""
This module (and specifically this class) provides a simple interface for other modules to read and write data to files.
The header information and file structure only need to be passed once.
Subsequent read and write operations can be performed by using the methods readData and writeData.
"""
def __init__ ( self, log = None, headerProfile = _DefaulHeader ):
"""
Initialize class.
"""
if log == None:
self.log = LogClass.LogClass(_CLASSNAME)
else:
self.log = deepcopy(log) # Logging class used for all in and output messages
self.log.addLabel(self)
self.Name = "" # Name of the profile
self.Comment = _COMMENT # comment sign
self.KeyIndicator = _KEYINDICATOR # sign indicating the keyword-end
self.HeaderLine = _HEADERINDICATOR # line to start / stop the header
self.Empty = 1 # number of empty lines after header line
self.upperCase = True
self.addUnknown = False
self.delmitter = None
self.accuracy = None
self.length = None
self.scientific = False
self.writeFunction = None
self.readFunction = None
self.headerProfile = headerProfile
# parse header profile
newDict = {}
for k in headerProfile :
# parse special keywords for configuration
if k == "_NAME" :
self.Name = headerProfile["_NAME"]
continue
if k == "_COMMENT" :
self.Comment = headerProfile["_COMMENT"]
continue
if k == "_KEYINDICATOR" :
self.KeyIndicator = headerProfile["_KEYINDICATOR"]
continue
if k == "_HEADERINDICATOR" :
self.HeaderLine = headerProfile["_HEADERINDICATOR"]
continue
if k == "_EMPTY" :
self.Empty = headerProfile["_EMPTY"]
continue
if k == "_UPPERCASE" :
self.upperCase = headerProfile["_UPPERCASE"]
continue
if k == "_ADDUNKNOWN" :
self.addUnknown = headerProfile["_ADDUNKNOWN"]
continue
if k == "_DELMITTER" :
self.delmitter = headerProfile["_DELMITTER"]
continue
if k == "_ACCURACY" :
self.accuracy = headerProfile["_ACCURACY"]
continue
if k == "_LENGTH" :
self.length = headerProfile["_LENGTH"]
continue
if k == "_SCIENTIFIC" :
self.scientific = True
continue
if k == "_ReadFunction" :
self.readFunction = headerProfile["_ReadFunction"]
continue
if k == "_WriteFunction" :
self.writeFunction = headerProfile["_WriteFunction"]
continue
if ( k[0] != '_' ) or ( k.upper().find("COMMENT") > -1 ):
if self.upperCase :
newDict[k.upper() + self.KeyIndicator] = headerProfile[k]
else :
newDict[k + self.KeyIndicator] = headerProfile[k]
self.printOut("Successfully implemented header profile '%s'" % self.Name, self.log.FLOW_TYPE)
self.profile = newDict # directory with the header profile
[docs]
def printOut ( self, line, message_type, noSpace = True, lineFeed = True ) :
"""
Wrapper for the printOut method of the actual log class
"""
self.log.printOut(line, message_type, noSpace, lineFeed)
[docs]
def createEmptyResult ( self ) :
"""
creates an empty result dictionary with all header entries set to default values.
Returns:
resultDict (dict): empty result dictionary with all header entries set to default values
"""
resultDict = {}
for p in self.profile:
if p.upper().find("_COMMENT") > -1 : # filter comment lines
continue
parseList = self.profile[p]
answerList = []
for idChar in parseList[1] :
if ( idChar == "U" ) or ( idChar == "S" ) :
answerList.append( "UNDEFINED" )
break
if ( idChar == "F" ) :
answerList.append( 0.0 )
break
if ( idChar == "D" ) :
answerList.append( 0 )
if ( idChar == "s" ) :
answerList.append( "UNDEFINED" )
if ( idChar == "f" ) :
answerList.append( 0.0 )
if ( idChar == "d" ) :
answerList.append( 0 )
resultDict[p] = answerList
return resultDict
[docs]
def parseLine ( self, line ):
"""
Parsing a given string according to the header.
Parameters:
line (string): The line to parse
Returns:
tuple (tuple): boolean flag indicating success or failure, Keyword-string ("ERROR:" in case of an error, "WARNING:" in case of a warning), list of parameters (list with one string holding the error message in case of an error)
"""
answerList = [] # list with all parsed parameters
line = line.strip() # remove CR / NL / EOF from line end
if ( len(line) < 1 ) or ( line[0] != self.Comment ) : # check for empty line or data contend line
return True, "", []
line = line[1:] # remove leading comment
columns = line.split() # split line
if len(columns) < 2 : # check for empty comment line
return True, "", []
# check for valid key-word
if self.upperCase:
KeyWord = columns[0].upper()
else :
KeyWord = columns[0]
self.printOut("Searching for key-word >%s<" % columns[0] , self.log.DEBUG_TYPE)
if KeyWord in self.profile :
# valid key-word detected
self.printOut(" detected in profile", self.log.DEBUG_TYPE, False)
parseList = self.profile[ KeyWord ]
# check number of available parameters
if len( columns )-1 < len( parseList[1] ):
return False, "ERROR:", ["Not enough parameter"]
# loop all requested parameters
cCount = 1
for p in parseList[1]:
if p == 'U' :
columns = line.split(maxsplit = 1)
UString = columns[1].split(self.Comment)[0].strip()
return True, KeyWord, [ UString ]
if p == 'S' :
while ( cCount < len(columns) ) and ( len(columns[cCount]) > 1 ) and ( columns[cCount][0] != self.Comment ) :
answerList.append(columns[cCount])
cCount += 1
return True, KeyWord, answerList
if p == 's' :
answerList.append(columns[cCount])
cCount += 1
if p == 'F' :
while ( cCount < len(columns) ) and ( len(columns[cCount]) > 1 ) and ( columns[cCount][0] != self.Comment ) :
try:
f = float(columns[cCount])
answerList.append(f)
except:
return False, "ERROR:", ["Parameter %d is not a float value!" % ( cCount - 1 ) ]
cCount += 1
return True, KeyWord, answerList
if p == 'f' :
try:
f = float(columns[cCount])
answerList.append(f)
except:
return False, "ERROR:", ["Parameter %d is not a float value!" % ( cCount - 1 ) ]
cCount += 1
if p == 'D' :
while ( cCount < len(columns) ) and ( len(columns[cCount]) > 1 ) and ( columns[cCount][0] != self.Comment ) :
try:
d = int(columns[cCount])
answerList.append(f)
except:
return False, "ERROR:", ["Parameter %d is not an integer value!" % ( cCount - 1 ) ]
cCount += 1
return True, KeyWord, answerList
if p == 'd' :
try:
d = int(columns[cCount])
answerList.append(d)
except:
return False, "ERROR:", ["Parameter %d is not an integer value!" % ( cCount - 1 ) ]
cCount += 1
return True, KeyWord, answerList
# check for unknown keywords
if KeyWord[-1] == ':' :
if self.addUnknown :
self.printOut(" adding unknown keyword to result", self.log.DEBUG_TYPE, False)
columns = line.split(maxsplit = 1)
UString = columns[1].split(self.Comment)[0]
return True, KeyWord, [ UString ]
else:
return False, "WARNING:", ["Unknown keyword >%s< detected" % columns[0]]
# seems to be a comment line
return True, "", []
[docs]
def parseHeader ( self, filename ):
"""
Getting all header data from file with filename 'filename'.
Parameters:
filename (string): path of file
Returns:
dictionary (dict): dictionary of results
"""
self.printOut("Parsing header of file '%s'" % filename, self.log.FLOW_TYPE)
with open(filename, 'r', encoding='latin-1' ) as fh : # open file for reading
resultDict = self.createEmptyResult()
for line in fh : # and loop all lines in the file
flag, key, resultList = self.parseLine( line )
if flag :
if len(resultList) > 0 :
resultDict[key] = resultList
else :
if key == "ERROR:" :
self.printOut(" " + key + " " + resultList[0], self.log.ERROR_TYPE, False)
else :
self.printOut(" " + key + " " + resultList[0], self.log.WARNING_TYPE, False)
fh.close()
return self._translateDict(resultDict)
def _translateDict ( self, inputDict, Direction = True) :
"""
Create an output dict using the key-word translations in the profile.
Parameters:
inputDict (dict): dictionary to translate
Direction (boolean): flag for forwards translation (True = forwards, False = backwards)
Returns:
outputDict (dict): translated dictionary
"""
self.printOut("Translating dictionary", self.log.FLOW_TYPE, False)
self.printOut(" using profile %s" % self.Name, self.log.DEBUG_TYPE, False)
profile = {}
for key in self.profile:
if key[0] == "_" : continue
profile[key] = deepcopy( self.profile[key] )
# reverse translation or not?
if not Direction :
self.printOut(" translating backwards to profile", self.log.DEBUG_TYPE, False)
newProfile = {} # establish new profile dictionary
for key in profile: # loop all keys in actual profile
newKey = profile[key][2] # extract translation key
if ( newKey == "" ) : # if translation key is empty,
newProfile[key] = profile[key] # use actual profile entry
else : # else
parts = newKey.split(self.KeyIndicator) # split new-key in key-section and value section
newKey = parts[0] + self.KeyIndicator # re-compose transation key
# allow for multiple keys of the same keyword to compose answer
d = 0 # set key-counter
while ( newKey + "%d" %d ) in newProfile: d+= 1 # check up to which counter the key is already in the new profile and establish therewith an unused key
newKey = newKey + "%d" %d
newArrange = [] # empty value translation string
if len( parts ) > 1 : # check if value translation is given and reverse it if so
for i, n in enumerate(parts[1]) : # loop all parts of the value-translation string
if n == '-' : continue
n = int(n) # extract number belonging to the actual position
while len(newArrange) <= n : # append positions in new value translation string
newArrange.append("-")
i = ("%d" %i)[0] # and finaly set value position number
newArrange[n] = i
newProfile[newKey] = profile[key] # set new profile entry
newProfile[newKey][2] = key
for a in newArrange :
newProfile[newKey][2] += a
profile = newProfile # set translation profile to reverted profile
# translate dictionary using the established profile
# establish key-translation dictionary
keyDict = {} # establish a key dictionary with all entries of the profile belonging to one key-word
for k in profile : # loop all keys in the profile
key = k.split(self.KeyIndicator)[0] + self.KeyIndicator # compare only part before key-identifier (here ':' )
if not key in keyDict : # establish key if not already present
keyDict[key] = []
keyDict[key].append(k) # and add actual key + expansion number to list of belonging keys
# translate dictionary
outputDict = {} # create an empty output dictionary
for k in inputDict : # loop all entries of the input dictionary
keyValues = deepcopy( inputDict[k] ) # get all values belonging to the key
translatedKey = k # pre-set translated key (in case translation is not necessary)
newKeyValues = keyValues # and translated key-values
#self.printOut("checking Key %s" % k, self.log.DEBUG_TYPE, False)
if k in keyDict: # if key is in the profile :
for key in keyDict[k] : # loop all keys in the profile belonging to this dictionary entry
# translate Key
parts = []
#self.printOut(" checking sub-Key %s" % key, self.log.DEBUG_TYPE, False)
newKey = profile[key][2] # extract translated key
if newKey != "" : # check if translation is required
parts = newKey.split(self.KeyIndicator) # if so extract new key and value assign string
if len(parts) > 0 :
translatedKey = parts[0] + self.KeyIndicator
if not ( translatedKey in outputDict ) : # add translated key to output dictionary if not already present
outputDict[translatedKey] = []
if ( len( parts ) > 1 ) and ( len( parts[1] ) > 0 ) : # check if value re-aranging is required
newKeyValues = outputDict[translatedKey] # get existing values
for i, n in enumerate( parts[1] ): # loop all positions in value assign string
try:
if n != '-' : # check if position is unused
n = int(n) # if not, extract integer from character
while n >= len(newKeyValues): # make sure, values exist up to the indicated number
newKeyValues.append("")
newKeyValues[n] = keyValues[i] # set value correspondingto assign string position and value number
except:
self.printOut("Can not sort result list correctly-- error in profile??", self.log.ERROR_TYPE, False)
self.printOut(" found Key %s" % translatedKey, self.log.DEBUG_TYPE, False)
outputDict[translatedKey] = newKeyValues # set output dictionary entry
return outputDict
def _printValue ( self, form, value, length = 10, acc = 4 ) :
"""
internal method: print value to string
"""
if ( ( form.upper() == "D" ) ) and isinstance( value, ( float, int ) ):
s = "%d" % value
elif ( ( form.upper() == "F" ) or ( form.upper() == "" ) ) and isinstance( value, ( float, int ) ):
f = "%." + "%d" % acc + "f"
s = f % value
#s +="0"*(acc-len(s.split('.')[1]))
elif ( form.upper() == "E" ) and isinstance( value, ( float, int ) ):
f = "%." + "%d" % acc + "e"
s = f % value
#s +="0"*(acc-len(s.split('.')[1]))
elif ( ( form.upper() == "S" ) or ( form.upper() == "U" ) or ( form.upper() == "" ) ) and isinstance( value, str ):
s = value
else:
s = ""
self.printOut(" no writing rutine for this data type", self.log.WARNING_TYPE, False)
length = int(length)
if (abs(length) < len(s)) and (length != 0):
if form.upper() == "S" :
return s[0 : length]
if form.upper() == "F":
if len(s.split(".")[0]) < length :
return s[0 : length]
else :
if ( form.upper() == "S" ) or ( form.upper() == "U" ) :
s = s + " "*(length - len(s))
else:
s = " "*(length - len(s)) + s
return s
[docs]
def writeHeader ( self, filename, resultDict ):
"""
Writing a header with all the contend given in the resultDictionary
Parameters:
filename (string): name of the file to write
resultDict (dictionary): dictionary with keywords and values to write
"""
def writeLine(key, values, maxKey, form = "U", comment = "") :
"""
create a formatted output line string
"""
outString = self.Comment + " " + key # write key
outString += " " * ( maxKey - len( key ) ) # write spaces
for i in range(0, len( values ) ) : # loop list of values/items assosiated with key
if i < len(form) :
if (form[i] == "U") or (form[i] == "S") or (form[i] == "F"):
f = deepcopy(form[i])
while i < len( values ):
outString += self._printValue( f, values[i] ) + " "
i+= 1
else :
outString += self._printValue( form[i], values[i] ) + " "
# write header entry comment if available
if ( isinstance( comment, str ) ) and ( comment != "" ) :
if len( outString ) < 80 :
outString += " " * ( 80 - len( outString ) )
outString += self.Comment + " " + comment
return outString + "\n"
self.printOut("Writing header to file '%s'" % filename, self.log.FLOW_TYPE)
ResultDict = self._translateDict(resultDict, Direction = False)
# get maximum length of the key-entry:
maxKey = 0
for key in ResultDict :
if len(key) > maxKey : maxKey = len(key)
maxKey += 1
with open(filename, 'w') as fh : # open file for reading
fh.write( self.Comment + self.HeaderLine + "\n" ) # write line for header start
for i in range(0, self.Empty): # write empty comment lines
fh.write( self.Comment + "\n")
fh.write( self.Comment + " " + self.Name + "\n" )
written = {} # dictionary of written keys
for key in self.profile :
if key.upper().find("_COMMENT") > -1 : # write comment line
fh.write( self.Comment + "\n" )
fh.write( self.Comment + " " + self.profile[key] + "\n" )
continue
if key in ResultDict :
written[key] = True
form = self.profile[key][1] # get key format string
comment = "" # get comment
if len( self.profile[key] ) > 3 :
comment = self.profile[key][3]
fh.write( writeLine( key, ResultDict[key], maxKey, form, comment ) ) # write line
if self.addUnknown: # write unknown keywords if required
fh.write( self.Comment + "\n" )
fh.write( self.Comment + " Other keywords:\n" )
for key in ResultDict : # loop dictionary
if not ( key in written ):
fh.write( writeLine( key, ResultDict[key], maxKey ) ) # write line
#for key in ResultDict : # loop dictionary
#outString = ""
#if ResultDict[key] == None :
#continue
#if not (key in self.profile ): # check if key is in the profile
#if self.addUnknown:
#form = "U"
#else :
#self.printOut(" Key >%s< is unknown to the actual profile!" % key, self.log.WARNING_TYPE, False)
#continue
#else :
#form = self.profile[key][1] # get key format string
#outString += self.Comment + " " + key # write key
#outString += " " * ( maxKey - len( key ) ) # write spaces
#for i in range(0, len( ResultDict[key] ) ) : # loop list of values/items assosiated with key
#if i < len(form) :
#if (form[i] == "U") or (form[i] == "S") or (form[i] == "F"):
#f = deepcopy(form[i])
#while i < len(ResultDict[key]):
#outString += self._printValue( f, ResultDict[key][i] ) + " "
#i+= 1
#else :
#outString += self._printValue( form[i], ResultDict[key][i] ) + " "
## write header entry comment if available
#if ( len( self.profile[key] ) > 3 ) and ( isinstance( self.profile[key][3], str ) ) and ( self.profile[key][3] != "" ) :
#if len( outString ) < 80 :
#outString += " " * ( 80 - len( outString ) )
#outString += self.Comment + " " + self.profile[key][3]
#fh.write(outString + "\n") # end line
for i in range(0, self.Empty): # write empty comment lines
fh.write( self.Comment + "\n")
fh.write(self.Comment + self.HeaderLine + "\n") # write line for header end
for i in range(0, self.Empty): # write empty comment lines
fh.write( self.Comment + "\n")
fh.close()
[docs]
def writeData ( self, filename, data, header = None, delmitter = " ", accuracy = 4, length = 10, dataDescription = "" ) :
"""
writes data array to disk.
Parameters:
filename (string): path of the file to write to
data (2D-array): array containing the data
header (dict): header data; if None, no header is written
delmitter (string): characters between the entries
accuracy (int): number of digits for float numbers
length (int): field length (minimal)
dataDescription (string): data line description string, if empty, nothing is written
"""
# overwrite given values with profile values if existant:
if self.delmitter != None :
delmitter = self.delmitter
if self.accuracy != None :
accuracy = self.accuracy
if self.length != None :
length = self.length
writeFlag = "a"
try:
if header == None:
writeFlag = "w"
except:
pass
if writeFlag == "a" :
self.writeHeader(filename, header)
self.printOut("Write data to file '%s'" % filename, self.log.FLOW_TYPE)
with open(filename, writeFlag) as fh :
if dataDescription != "" :
fh.write( self.Comment + "\n" )
fh.write( self.Comment + dataDescription + "\n" )
for lData in data :
line = False
if self.writeFunction != None : # translate data-point if translation function is given
lineData = self.writeFunction( lData )
else:
lineData = lData
for item in lineData:
if line :
fh.write(delmitter)
if isinstance (item, int) :
fh.write(self._printValue("D", item, length, accuracy) )
elif self.scientific :
fh.write(self._printValue("E", item, length, accuracy) )
else:
fh.write(self._printValue("", item, length, accuracy) )
line = True
fh.write("\n")
fh.close()
[docs]
def readData ( self, filename, delmitter = None, noColumns = 0, tryFloat = True ) :
"""
Reads data array from disk.
Parameters:
filename (string): path of the file to write to
delmitter (string): characters between the entries
NoColumns (int): maximal number of columns to consider
tryFloat (boolean): indicating if a pure string array is returned (False) or if the entries are tried to be interpreted as floats (True)
Returns:
tuple (tuple): headerDict (dict with header data), data (array with data)
"""
# NOTE: this might need to be changed. is everything correct with "delmitter"?
if ( type( self.delmitter ) != type( None ) ) and ( type( delmitter ) == type( None ) ):
delmitter = self.delmitter
headerDict = self.parseHeader ( filename )
self.printOut("Reading data from file '%s'" % filename, self.log.FLOW_TYPE)
data = []
with open(filename, "r", encoding='latin-1') as fh : ##### errors = 'replace'
for line in fh : # and loop all lines in the file
dLine = []
line = line.strip() # remove CR / NL / EOF from line end
if ( len(line) < 1 ) or ( line[0] == self.Comment ) : # check for empty line or data contend line
continue
if delmitter != " " :
columns = line.split(delmitter)
else :
columns = line.split()
for c in columns:
entry = c.strip()
if tryFloat :
try :
entry = float(entry)
dLine.append(entry)
except :
pass
else:
dLine.append(entry)
if len( dLine ) < noColumns : # skip incomplete lines
continue
if self.readFunction != None : # translate line entries if translation function is given
data.append( self.readFunction( dLine ) )
else:
data.append( dLine )
fh.close()
return headerDict, data