#!/usr/bin/env python3
"""
This file is part of the PAFFrontendSim.
The PAFFrontendSim is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License v3 as published by the Free Software Foundation.
The PAFFrontendSim is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the PAFFrontendSim. If not, see https://www.gnu.org/licenses/.
"""
#############################################################################
# import libs
#############################################################################
import numpy as np
import matplotlib.pyplot as plt
from copy import copy, deepcopy
import LogClass
from Simulation.UFO import UFODataClass
from Simulation.constants import _ETA, _kB, _EPS, _MU, _C, _I0iso
#############################################################################
# constants
#############################################################################
_CLASSNAME = "ImpedanceData"
#############################################################################
# classes
#############################################################################
[docs]
class ImpedanceMatrix :
"""
Module to handle mutual impedance matrix (MIM) data.
In a PAF, the radiation pattern of each antenna is affected by every other antenna in the array.
As a result there is a distinction between isolated element patterns (radiation pattern of an antenna in isolation) and embedded element patterns.
The latter includes the effects of the other antennas.
The embedded element patterns can be determined through the MIM.
The MIM (usually denoted as Z in literature) is an N x N matrix, where N is the amount of antennas and the entries are complex values in Ohm.
It describes the relationship between voltage and current at each port: V = Z * I
V and I are column vectors with length N.
The MIM can also be transformed into S-Parameters.
This class has a method (correlateFields) to determine the MIM, as well as methods to read and write MIM data to a file.
"""
#####################################
# Keywords for the file
__KEYS = {
"Comment" : '#' ,
"FileType" : 'IMPEDANCE-MATRIX:' ,
"Wavelength" : 'WAVELENGTH:' ,
"Impedance" : 'IMPEDANCE:' ,
"I0" : 'I0:' ,
"Bandwidth" : 'BANDWIDTH:' ,
"DetectorGain" : 'DETECTORGAIN' ,
"Aeff" : 'A_EFFECTIVE:' ,
"NoElem" : 'NO_OF_ELEMENTS:' ,
"Data" : 'DATA:' ,
"Name" : 'NAME:'
}
#####################################
# internal data and init method
# init method
def __init__ ( self, wl = 214., z= 50., i0 = _I0iso, bw = 1000000., dg = 1., aa = 0., log = None, name = "" ):
"""
Initialize class.
"""
if log == None:
self.log = LogClass.LogClass(_CLASSNAME)
else:
self.log = deepcopy(log) # logging class used for all in and output messages
self.log.addLabel(self)
self.Wavelength = wl # Wavelength at which the simulation was run [mm]
self.Impedance = z # input impedance of the used antenna system
self.I0 = i0 # exitation current of the antenna system used for the simulations [A]
self.Bandwidth = bw # channel-bandwidth [Hz] for the calculation
self.DetectorGain = dg # detector gain (used to avoid numerical problems)
self.Aeff = aa # effective antenna collecting area
self.noElements = 0. # number of elements in the array
self.Data = [] # complex data of the matrix
self.Name = name # name of the impedance matrix
[docs]
def printOut ( self, line, message_type, noSpace = True, lineFeed = True ) :
"""
Wrapper for the printOut method of the actual log class.
"""
self.log.printOut(line, message_type, noSpace, lineFeed)
#####################################
# File-IO methods
# write file
[docs]
def writeFile ( self, filename):
"""
Method to write the impedance matrix data to a file.
Parameters:
filename (string): File to be written.
"""
self.printOut("Writing File %s" % filename, self.log.FLOW_TYPE)
fh = open(filename, 'w') # open file for writing
# write header block
fh.write(self.__KEYS['Comment'] + " PAF-Simulator mutual impedance matrix file\n" )
fh.write(self.__KEYS['Comment'] + " \n" )
fh.write(self.__KEYS['Comment'] + " File-Header\n" )
fh.write(self.__KEYS['FileType'] + " # Type Identifier (only lines after Type-ID are parsed during reading\n" )
fh.write(self.__KEYS['Comment'] + " \n" )
fh.write(self.__KEYS['Name'] + " %s \n" % self.Name )
fh.write(self.__KEYS['Comment'] + " \n" )
fh.write(self.__KEYS['Comment'] + " Exitation data (Wavelength, I0 and input impedance)\n")
fh.write(self.__KEYS['Wavelength'] + " %.3f mm\n" % self.Wavelength )
fh.write(self.__KEYS['I0'] + " %.5f A\n" % self.I0 )
fh.write(self.__KEYS['Bandwidth'] + " %.5f Hz\n" % self.Bandwidth )
fh.write(self.__KEYS['DetectorGain'] + " %.5f \n" % self.DetectorGain )
fh.write(self.__KEYS['Impedance'] + " %.5f Ohm\n" % self.Impedance )
fh.write(self.__KEYS['Comment'] + " \n" )
fh.write(self.__KEYS['Aeff'] + " %.5f mm^2\n" % self.Aeff )
fh.write(self.__KEYS['Comment'] + " \n" )
fh.write(self.__KEYS['Comment'] + " Data-Block (index_A index_B impedance_real impedance_imaginary):\n" )
fh.write(self.__KEYS['NoElem'] + " %d \n" % self.noElements )
fh.write(self.__KEYS['Comment'] + " \n" )
# write data block
for indexA in range (0, self.noElements) :
for indexB in range (0, self.noElements) :
fh.write(self.__KEYS['Data'] + " %d %d %.6f %.6f\n" % ( indexA, indexB,
self.Data[indexA, indexB].real,
self.Data[indexA, indexB].imag) )
# write end block
fh.write(self.__KEYS['Comment'] + " \n" )
fh.write(self.__KEYS['Comment'] + " end of file\n" )
fh.close()
[docs]
def readFile ( self, filename) :
"""
Method to read an impedance matrix data file.
Parameters:
filename (string): Name of the input file
Returns:
idOK (boolean): error flag (true on success, false on error)
"""
self.printOut("Reading File %s" % filename, self.log.FLOW_TYPE)
self.Data = [] # delete all data
idOK = False
with open(filename, 'r') as f : # open file for reading
dataOK = False
for line in f : # and loop all lines in the file
line = line.strip() # remove CR / NL / EOF from line end
columns = line.split() # split line
if ( len(columns) == 0) : continue
if columns[0] == self.__KEYS['FileType'] :# check for correct type-ID
idOK = True
if not idOK : continue # continue with next line if type-ID was not set so far
# Parse header line
if columns[0] == self.__KEYS['Wavelength'] :
self.Wavelength = float(columns[1])
continue
if columns[0] == self.__KEYS['I0'] :
self.I0 = float(columns[1])
continue
if columns[0] == self.__KEYS['Impedance'] :
self.Impedance = float(columns[1])
continue
if columns[0] == self.__KEYS['Bandwidth'] :
self.Bandwidth = float(columns[1])
continue
if columns[0] == self.__KEYS['DetectorGain'] :
self.DetectorGain = float(columns[1])
continue
if columns[0] == self.__KEYS['Aeff'] :
self.Aeff = float(columns[1])
continue
if columns[0] == self.__KEYS['NoElem'] :
self.noElements = int(float(columns[1]))
self.Data = np.zeros((self.noElements, self.noElements), dtype = complex)
dataOK = True
continue
if ( columns[0] == self.__KEYS['Data'] ) and ( dataOK ):
indexA = int(float(columns[1]))
indexB = int(float(columns[2]))
Re = float(columns[3])
Im = float(columns[4])
self.Data[indexA, indexB] = Re + 1j*Im
f.close()
if not idOK :
self.printOut(" Error while reading", self.log.ERROR_TYPE, False)
return idOK
#####################################
# plotting methods_
[docs]
def plotMatrix ( self , name = "", figname = "", show = False ) :
"""
Plot the far-field response of the full array on sky.
Parameters:
name (string): Name of the matrix (shown in the title)
figname (string): Filename of the figure (if given figure is only shown on screen if show == True
show (boolean): Flag to show plot on screen even when being written to disk (True)
"""
self.printOut("Plotting Impedance matrix:", self.log.FLOW_TYPE)
fig, ax = plt.subplots()
mat = np.absolute(self.Data)
mat = mat / np.max(mat)
cf = ax.matshow(mat, cmap = plt.get_cmap('rainbow'), vmin = 0.0, vmax = 1.0)
ax.set_xlabel("Element number")
ax.set_ylabel("Element number")
ax.set_title ("Mutual impedance matrix %s (relative amplitude)" % name)
fig.colorbar(cf)
ax.set_aspect('equal')
if figname != "" :
fig.savefig(figname)
if show :
plt.show()
else :
plt.show()
plt.close()
#####################################
# calculate matrix
[docs]
def correlate (self, baseName, detectorGain = 10000.):
"""
Calculate mutual impedance matrix out of the individual element far-field data or spill-over data.
Method will be removed in future!!!! (not necessary anymore to work on files)
Returns:
boolean (boolean): True on success.
"""
self.DetectorGain = detectorGain
self.printOut("Correlating UFO files with base-name '%s'" % baseName, self.log.FLOW_TYPE)
# get list of filenames from base-name and wavelength
baseName = baseName.replace("//", "/") # remove double slash if present
names = sorted(Path("").glob(f"{baseName}*.dat"))
dataList = [] # list of UFO classes holding the field data
noList = [] # list of the element numbers (taken from filename)
self.printOut(" loading files :", self.log.DEBUG_TYPE, False)
ufo = UFODataClass.UFOData(log = self.log)# create new ufo class
E_fields_X = []
E_fields_Y = []
E_fields_Z = []
el = 0 # counter of loaded element far field patterns
for name in names : # load all data files
na = name.split(baseName)[1] # extract element number out of file name (strip base_name and file path
nl = na.split('_') # strip end of the file name after the element number
n = nl[0]
if (n == '') or (len(nl) < 2) : # if element number does not exist, discart name
self.printOut(" discarding file %s" % name, self.log.WARNING_TYPE, False)
continue
wl = float(nl[1].split('.dat')[0]) # extract wavelength from filename
if (wl == self.Wavelength ) : # and check if it matches
noList.append(int(n)) # add element number to list of numbers
ufo.readFile(name) # read ufo file
E_fields_X.append(np.zeros(ufo.points, dtype = np.complex128))
E_fields_Y.append(np.zeros(ufo.points, dtype = np.complex128))
E_fields_Z.append(np.zeros(ufo.points, dtype = np.complex128))
for idx, in range(0, ufo.points) :
sqrtdf = np.sqrt(ufo.Data[idx].df)
E_fields_X[el][idx] = (ufo.Data[idx].Er.x + 1j*ufo.Data[idx].Ei.x) * sqrtdf
E_fields_Y[el][idx] = (ufo.Data[idx].Er.y + 1j*ufo.Data[idx].Ei.y) * sqrtdf
E_fields_Z[el][idx] = (ufo.Data[idx].Er.z + 1j*ufo.Data[idx].Ei.z) * sqrtdf
self.printOut(" file %s with element number %d was loaded" %(name, int(n)), self.log.DEBUG_TYPE, False, False)
el += 1
#conditioning e-fields
E_fields_X = np.array(E_fields_X)
E_fields_Y = np.array(E_fields_Y)
E_fields_Z = np.array(E_fields_Z)
self.noElements = len(noList) # set number of elements in the array
self.Data = np.zeros((self.noElements, self.noElements), dtype = complex)
if self.noElements == 0 : # check if data is present
self.printOut(" no data loaded -- finished", self.log.WARNING_TYPE, False)
return False
self.printOut(" correlating fields", self.log.DATA_TYPE, False)
for d1 in range(0, self.noElements) : # loop all elements and correlate data vi UFODataClass
for d2 in range(d1, self.noElements) :
self.printOut(" correlating fields %d and %d" %(noList[d1], noList[d2]), self.log.FLOW_TYPE, False, False)
self.Data[noList[d1], noList[d2]] = np.dot(E_fields_X[d1][:], E_fields_X[d2][:].conj())
self.Data[noList[d1], noList[d2]] += np.dot(E_fields_Y[d1][:], E_fields_Y[d2][:].conj())
self.Data[noList[d1], noList[d2]] += np.dot(E_fields_Z[d1][:], E_fields_Z[d2][:].conj())
self.Data[noList[d1], noList[d2]] *= (0.5/_ETA) / detectorGain**2.
# populate second half
self.Data[noList[d2], noList[d1]] = self.Data[noList[d1], noList[d2]]
self.printOut(" ==> %.4f %+.4fi" % (self.Data[noList[d1], noList[d2]].real, self.Data[noList[d1], noList[d2]].imag), self.log.FLOW_TYPE, False)
return True
[docs]
def correlateFields (self, e_fields, df_array) :
"""
Correlate the fields given in e_fields with the corresponding surface array in df_array and fill the matrix.
Parameters:
e_fields (numpy.array): [element-no., sky position point no., 3 (E_vector complex, 3d)] of the e-fields
df_array (numpy.array): [sky position point no.] of surface element sizes
Returns:
boolean (boolean): True if successful.
"""
self.printOut("Correlating fields", self.log.FLOW_TYPE)
self.noElements = len(e_fields) #[:,0,0]) # set number of elements in the array
self.Data = np.zeros((self.noElements, self.noElements), dtype = complex)
self.printOut(" of %d elements" %self.noElements, self.log.DATA_TYPE, False)
if self.noElements == 0 : # check if data is present
self.printOut(" no data available -- finished", self.log.WARNING_TYPE, False)
return False
sqrtdf = np.sqrt(df_array) # calculating sqrt of the surface area vector
for d1 in range(0, self.noElements) : # loop all elements and correlate data vi UFODataClass
for d2 in range(d1, self.noElements) :
self.printOut(" correlating fields %d and %d" %(d1, d2), self.log.FLOW_TYPE, False, False)
self.Data[d1, d2] = np.dot(e_fields[d1, :, 0]*sqrtdf, (e_fields[d2, :, 0]*sqrtdf).conj())
self.Data[d1, d2] += np.dot(e_fields[d1, :, 1]*sqrtdf, (e_fields[d2, :, 1]*sqrtdf).conj())
self.Data[d1, d2] += np.dot(e_fields[d1, :, 2]*sqrtdf, (e_fields[d2, :, 2]*sqrtdf).conj())
self.Data[d1, d2] *= (0.5/_ETA) / self.DetectorGain**2.
# populate second half
self.Data[d2, d1] = np.conjugate(self.Data[d1, d2])
self.printOut("", self.log.FLOW_TYPE, False)
return True
[docs]
def getMutualPower (self, element) :
"""
Calculates the power coupled to the other antennas.
Parameters:
element (int): Number of the element to calculate.
Returns:
p (float): Denoting the power in the mutual coupling to other elements.
float (float?): Power in the element self overlap.
"""
if element == None:
element = 0
p = 0.
v = deepcopy(self.Data[element, :])
for idx, d in enumerate(v) :
if idx != element :
p += np.absolute(d)
return p, v[element]
# end of ImpedanceMatrixDataClass
#####################################