Source code for wf

import pylab as pl
import numpy as np

import ansc


#------------------------------------------------------------
### Some constants ###
#------------------------------------------------------------

WF_NEAREST   = 0
WF_LINEAR    = 1
WF_QUADRATIC = 2
WF_SINC      = 3
WF_LANCZOS   = 4

WF_DEFAULT   = 4


#------------------------------------------------------------
### Wf class ###
#------------------------------------------------------------

[docs]class Wf : """The base waveform class""" #------------------------------------------------------------ ### Initialisation ### #------------------------------------------------------------ def __init__(self,ns,fs,t0=0.,name="") : """Create a waveform object""" self.ns = ns self.fs = fs self.t0 = t0 self.name = name self.fft = None # set the fft to None for checks self.t = pl.zeros(self.ns) for i in range(0,self.ns) : self.t[i] = self.t0 + i/self.fs #------------------------------------------------------------ ### Helper functions ### #------------------------------------------------------------
[docs] def getNs(self) : """Return the number of samples""" return self.ns #------------------------------------------------------------
[docs] def getFs(self) : """Return the sampling frequency""" return self.fs #------------------------------------------------------------
[docs] def getT0(self) : """Return the *t0*""" return self.t0 #------------------------------------------------------------
[docs] def getWf(self) : """getWf function prototype""" pass #------------------------------------------------------------ ### WfDouble class ### #------------------------------------------------------------
[docs]class WfDouble(Wf) : """Real waveform class""" #------------------------------------------------------------ ### Initialisation ### #------------------------------------------------------------ def __init__(self,ns,fs,t0=0.,name="") : """Create the waveform object""" Wf.__init__(self,ns,fs,t0,name) self.wf = pl.zeros(ns) #------------------------------------------------------------ ### Helper functions ### #------------------------------------------------------------
[docs] def getWf(self) : """Return the waveform""" return self.wf #------------------------------------------------------------
[docs] def isCompatible(self,wf) : """Check compatibility with another waveform""" compat = False if self.fs == wf.getFs() : compat = True return compat #------------------------------------------------------------
[docs] def isSimilar(self,wf) : """Check compatibility with another waveform""" sim = False if self.isCompatible(wf) and self.t0 == wf.getT0() : sim = True return sim #------------------------------------------------------------
[docs] def setNs(self,ns) : """Reset the number of samples in the waveform""" self.ns = ns self.wf = pl.zeros(self.ns) self.t = pl.zeros(self.ns) for i in range(0,self.ns) : self.t[i] = self.t0 + i/self.fs #------------------------------------------------------------
[docs] def setFs(self,fs) : """Reset the sampling frequency of the waveform""" self.fs = fs #------------------------------------------------------------
[docs] def setT0(self,t0) : """Reset the start time of the waveform""" self.t0 = t0 for i in range(0,self.ns) : self.t[i] = self.t0 + i/self.fs #------------------------------------------------------------
def getValue(self,i) : """Get the value of the sample number *i*""" val = 0. if i >= 0 and i < self.ns : val = self.wf[i] return val #------------------------------------------------------------
[docs] def getValue(self,t,mode) : """Get the value of the waveform at a time *t*""" val = 0. i0 = 0 t1 = t - self.t0 # determine the nearest sample on the left i0 = pl.floor(t1*self.fs) if i0 < 0 : i0 = 0 if i0 > self.ns-2 : i0 = self.ns-2; # no interpolation ( WF_NEAREST ) if mode == self.WF_NEAREST : if t1*self.fs-float(i0) < 0.5 : val = self.wf[i0] else : val = self.wf[i0+1] # linear interpolation ( WF_LINEAR ) elif mode == self.WF_LINEAR : val = self.wf[i0] + (self.wf[i0+1] - self.wf[i0])*(t1*self.fs - float(i0)) # parabolic interpolation elif mode == self.WF_QUADRATIC : if t1*self.fs - float(i0) < 0.5 : if i0 > 0 : val = ansc.quadInterpol(t1,float(i0-1)/self.fs,float(i0)/self.fs,float(i0+1)/self.fs,self.wf[i0-1],self.wf[i0],self.wf[i0+1]) else : val = ansc.quadInterpol(t1,float(i0)/self.fs,float(i0+1)/self.fs, float(i0+2)/self.fs,self.wf[i0],self.wf[i0+1],self.wf[i0+2]) else : if i0 < self.ns-2 : val = ansc.quadInterpol(t1,float(i0)/self.fs,float(i0+1)/self.fs,float(i0+2)/self.fs,self.wf[i0],self.wf[i0+1],self.wf[i0+2]) else : val = ansc.quadInterpol(t1,flot(i0-1)/self.fs,float(i0)/self.fs,float(i0+1)/self.fs,self.wf[i0-1],self.wf[i0],self.wf[i0+1]) # sinc(x) interpolation ( WF_SINC ) elif mode == self.WF_SINC : for i in range(0,self.ns) : val += self.wf[i] * pl.sinc( (t1 - float(i)/self.fs)*self.fs) # lanczos(x) kernel interpolate, best for waveform reconstrucution if mode == self.WF_LANCZOS : lanczos_a = 5; #3; // lanczos kernel width... na1 = int(t1*self.fs - lanczos_a) na1 += 1 na2 = int(t1*self.fs + lanczos_a) if na1 < 0 : na1 = 0 if na2 > self.ns-1 : na2 = self.ns-1 for i in range(na1,na2+1) : val += self.wf[i] * ansc.lanczos( (t1 - float(i)/self.fs)*self.fs, lanczos_a ) return val #------------------------------------------------------------
[docs] def setValue(self,i,val) : """Set the value of the sample *i*""" if i >= 0 and i < self.getNs() : self.getWf()[i] = val #------------------------------------------------------------
[docs] def setValues(self,vals) : """Set the values of the waveform""" for i in range(0,min(pl.shape(vals)[0],self.ns)) : self.getWf()[i] = vals[i] #------------------------------------------------------------
[docs] def subset(self,n1,n2) : """Get a new waveform with a subset of data [n1,n2)""" if n2 < n1 : return pl.nan wf = WfDouble(n2-n1,self.fs,t0+n1/self.fs) for i in range(n1,n2) : wf.getWf()[i] = self.wf[i+n1] return wf #------------------------------------------------------------
[docs] def reset(self) : """Reset the data in the waveform""" self.t0 = 0. self.wf = pl.zeros(self.ns) #------------------------------------------------------------
[docs] def copy(self,wf) : """Save a copy of data in another waveform""" wf.setFs(self.fs) wf.reset() wf.setT0(self.t0) for i in range(0,min(self.ns,wf.getNs())) : wf.getWf()[i] = self.wf[i] #------------------------------------------------------------
[docs] def clone(self) : """Make a new copy of the waveform""" wf = WfDouble(self.ns,self.fs,self.t0) for i in range(0,self.ns) : wf.getWf()[i] = self.wf[i] return wf #------------------------------------------------------------ ### Arithmetics ### #------------------------------------------------------------
[docs] def add(self,wf) : """Add another waveform data""" if self.isSimilar(wf) : self.wf += self.wf + wf.getWf() elif self.isCompatible(wf) : i = 0 t = max(self.t0,wf.getT0()) # fix possible infinite loop!!! while self.t0 + i/self.fs < t : i += 1 # find the first sample t = self.t0 + i/self.fs; # time of the first affected sample while i < self.ns and t < wf.getNs()/wf.getFs()+wf.getT0() : self.wf[i] += wf.getValue(t,self.WF_DEFAULT); t += 1/self.fs i += 1 # close if !!! #------------------------------------------------------------
[docs] def subtract(self,wf) : """Subtract another waveform data""" wf1 = clone(wf) wf1.scale(-1.) self.add(wf1) #------------------------------------------------------------
[docs] def multiply(self,wf) : """Multiply by another waveform data""" if self.isSimilar(wf) : self.wf = self.wf * wf.getWf() elif self.isCompatible(wf) : i = 0; t = max(self.t0,wf.getT0()) while self.t0 + i/self.fs < t : i += 1 # find the first sample t = self.t0 + i/self.fs; # time of the first affected sample while i < self.ns and t < wf.getNs()/wf.getFs()+wf.getT0() : self.wf[i] += wf.getValue(t,self.WF_DEFAULT) t += 1/self.fs i += 1 #------------------------------------------------------------
[docs] def divide(self,wf) : """Multiply by another waveform data""" if self.isSimilar(wf) : self.wf = self.wf/wf.getWf() elif self.isCompatible(wf) : i = 0; t = max(self.t0,wf.getT0()) while self.t0 + i/self.fs < t : i += 1 # find the first sample t = self.t0 + i/self.fs; # time of the first affected sample while i < self.ns and t < wf.getNs()/wf.getFs()+wf.getT0() : self.wf[i] /= wf.getValue(t,self.WF_DEFAULT) t += 1/self.fs i += 1 #------------------------------------------------------------ ### Some other operations and useful functions ### #------------------------------------------------------------
[docs] def scale(self,s) : """Scale the waveform data""" self.wf *= s #------------------------------------------------------------
[docs] def addBias(self,bias) : """Add a bias (pedestal) to the waveform""" self.wf += bias #------------------------------------------------------------
[docs] def addTrend(self,trend) : """Add a trend to the waveform""" for i in range(0,self.ns) : self.wf[i] += trend*i/self.fs #------------------------------------------------------------
[docs] def addSin(self,amp,freq,phi=0.) : """Add a sinusoidal signal""" self.wf += amp*pl.sin(2.*pl.pi*freq*self.t+phi) #------------------------------------------------------------
[docs] def addBurst(self,amp,freq,t1,tp) : """Add an RF burst""" for i in range (0,self.ns) : t = i/self.fs if t >= t1 and t <= t1+tp : self.wf[i] = amp*pl.sin(2*pl.pi*freq*(t-t1)) #------------------------------------------------------------
[docs] def addNoise(self,sigma) : """Add noise to the waveform""" noise = np.random.normal(0.,sigma,self.ns) self.wf += noise #------------------------------------------------------------ ### FFT ### #------------------------------------------------------------
[docs] def doFft(self) : """Take an FFT of the waveform""" self.fft = pl.fft(self.wf) self.f = pl.fftfreq(self.ns,1./self.fs) #------------------------------------------------------------ ### Statistics ### #------------------------------------------------------------
[docs] def getStat(self) : """Get waveform statistics""" self.max = self.wf.max() self.min = self.wf.min() self.mean = self.wf.mean() self.rms = pl.sqrt(self.wf.var()) #------------------------------------------------------------ ### Output ### #------------------------------------------------------------
[docs] def printOut(self) : """Print out the waveform parameters and data""" if self.name != "" : print self._name print "ns = ",self.ns,"fs = ",self.fs,"t0 = ",self.t0 print self.wf #------------------------------------------------------------
[docs] def plotWf(self) : """Plot the waveform""" pl.plot(self.t,self.wf) #------------------------------------------------------------
[docs] def plotSpectrum(self) : """Plot the spectrum of the waveform""" self.doFft() np = int(len(self.f)/2) pl.plot(self.f[0:np],abs(self.fft[0:np])) #------------------------------------------------------------
[docs] def plotFft(self) : """Plot the FFT of the waveform""" self.doFft() pl.plot(self.f,abs(self.fft)) #------------------------------------------------------------
""" int wfDouble::castInt( wfInt *cast ) { if ( !this->initialised() ) { rfdspError( "Waveform is not initialised", __FILE__, __LINE__ ); return RFDSP_FAULT; } if ( !cast->initialised() ) { rfdspError( "Argument waveform is not initialised", __FILE__, __LINE__ ); return RFDSP_FAULT; } if ( this->compatible( cast ) ) { for ( int i = 0; i < MIN( _ns, cast->getNs() ); i++ ) cast->getWf()[i] = (int) round( _wf[i] ); } else { rfdspWarning( "Using default interpolation mode", __FILE__, __LINE__ ); if ( this->castInt( cast, WF_DEFAULT ) ) return RFDSP_FAULT; } return RFDSP_SUCCESS; } //----------------------------------------------------------------- int wfDouble::castInt( wfInt *cast, unsigned int mode ) { if ( !this->initialised() ) { rfdspError( "Waveform is not initialised", __FILE__, __LINE__ ); return RFDSP_FAULT; } if ( !cast->initialised() ) { rfdspError( "Argument waveform is not initialised", __FILE__, __LINE__ ); return RFDSP_FAULT; } int i = 0; double t = MAX( _t0, cast->getT0() ); while ( i/cast->getFs() < t ) i++; t = cast->getT0() + i/cast->getFs(); while ( ( i < cast->getNs() ) && ( t < ns/fs + _t0 ) ) { cast->getWf()[i] = (int) round( this->getValue( t, mode ) ); t += 1/cast->getFs(); i++; } return RFDSP_SUCCESS; } //--------------------------------------------------------------------- wfInt* wfDouble::castIntNew() { wfInt *wf = new wfInt( _ns, _fs, _t0 ); if ( this->castInt( wf ) ) { delete wf; return NULL; } return wf; } """