import pylab as pl
import numpy as np
import ansc
#------------------------------------------------------------
### Some constants ###
#------------------------------------------------------------
WF_NEAREST = 0
WF_LINEAR = 1
WF_QUADRATIC = 2
WF_SINC = 3
WF_LANCZOS = 4
WF_DEFAULT = 4
#------------------------------------------------------------
### Wf class ###
#------------------------------------------------------------
[docs]class Wf :
"""The base waveform class"""
#------------------------------------------------------------
### Initialisation ###
#------------------------------------------------------------
def __init__(self,ns,fs,t0=0.,name="") :
"""Create a waveform object"""
self.ns = ns
self.fs = fs
self.t0 = t0
self.name = name
self.fft = None # set the fft to None for checks
self.t = pl.zeros(self.ns)
for i in range(0,self.ns) :
self.t[i] = self.t0 + i/self.fs
#------------------------------------------------------------
### Helper functions ###
#------------------------------------------------------------
[docs] def getNs(self) :
"""Return the number of samples"""
return self.ns
#------------------------------------------------------------
[docs] def getFs(self) :
"""Return the sampling frequency"""
return self.fs
#------------------------------------------------------------
[docs] def getT0(self) :
"""Return the *t0*"""
return self.t0
#------------------------------------------------------------
[docs] def getWf(self) :
"""getWf function prototype"""
pass
#------------------------------------------------------------
### WfDouble class ###
#------------------------------------------------------------
[docs]class WfDouble(Wf) :
"""Real waveform class"""
#------------------------------------------------------------
### Initialisation ###
#------------------------------------------------------------
def __init__(self,ns,fs,t0=0.,name="") :
"""Create the waveform object"""
Wf.__init__(self,ns,fs,t0,name)
self.wf = pl.zeros(ns)
#------------------------------------------------------------
### Helper functions ###
#------------------------------------------------------------
[docs] def getWf(self) :
"""Return the waveform"""
return self.wf
#------------------------------------------------------------
[docs] def isCompatible(self,wf) :
"""Check compatibility with another waveform"""
compat = False
if self.fs == wf.getFs() :
compat = True
return compat
#------------------------------------------------------------
[docs] def isSimilar(self,wf) :
"""Check compatibility with another waveform"""
sim = False
if self.isCompatible(wf) and self.t0 == wf.getT0() :
sim = True
return sim
#------------------------------------------------------------
[docs] def setNs(self,ns) :
"""Reset the number of samples in the waveform"""
self.ns = ns
self.wf = pl.zeros(self.ns)
self.t = pl.zeros(self.ns)
for i in range(0,self.ns) :
self.t[i] = self.t0 + i/self.fs
#------------------------------------------------------------
[docs] def setFs(self,fs) :
"""Reset the sampling frequency of the waveform"""
self.fs = fs
#------------------------------------------------------------
[docs] def setT0(self,t0) :
"""Reset the start time of the waveform"""
self.t0 = t0
for i in range(0,self.ns) :
self.t[i] = self.t0 + i/self.fs
#------------------------------------------------------------
def getValue(self,i) :
"""Get the value of the sample number *i*"""
val = 0.
if i >= 0 and i < self.ns :
val = self.wf[i]
return val
#------------------------------------------------------------
[docs] def getValue(self,t,mode) :
"""Get the value of the waveform at a time *t*"""
val = 0.
i0 = 0
t1 = t - self.t0
# determine the nearest sample on the left
i0 = pl.floor(t1*self.fs)
if i0 < 0 :
i0 = 0
if i0 > self.ns-2 :
i0 = self.ns-2;
# no interpolation ( WF_NEAREST )
if mode == self.WF_NEAREST :
if t1*self.fs-float(i0) < 0.5 :
val = self.wf[i0]
else :
val = self.wf[i0+1]
# linear interpolation ( WF_LINEAR )
elif mode == self.WF_LINEAR :
val = self.wf[i0] + (self.wf[i0+1] - self.wf[i0])*(t1*self.fs - float(i0))
# parabolic interpolation
elif mode == self.WF_QUADRATIC :
if t1*self.fs - float(i0) < 0.5 :
if i0 > 0 :
val = ansc.quadInterpol(t1,float(i0-1)/self.fs,float(i0)/self.fs,float(i0+1)/self.fs,self.wf[i0-1],self.wf[i0],self.wf[i0+1])
else :
val = ansc.quadInterpol(t1,float(i0)/self.fs,float(i0+1)/self.fs, float(i0+2)/self.fs,self.wf[i0],self.wf[i0+1],self.wf[i0+2])
else :
if i0 < self.ns-2 :
val = ansc.quadInterpol(t1,float(i0)/self.fs,float(i0+1)/self.fs,float(i0+2)/self.fs,self.wf[i0],self.wf[i0+1],self.wf[i0+2])
else :
val = ansc.quadInterpol(t1,flot(i0-1)/self.fs,float(i0)/self.fs,float(i0+1)/self.fs,self.wf[i0-1],self.wf[i0],self.wf[i0+1])
# sinc(x) interpolation ( WF_SINC )
elif mode == self.WF_SINC :
for i in range(0,self.ns) :
val += self.wf[i] * pl.sinc( (t1 - float(i)/self.fs)*self.fs)
# lanczos(x) kernel interpolate, best for waveform reconstrucution
if mode == self.WF_LANCZOS :
lanczos_a = 5; #3; // lanczos kernel width...
na1 = int(t1*self.fs - lanczos_a)
na1 += 1
na2 = int(t1*self.fs + lanczos_a)
if na1 < 0 :
na1 = 0
if na2 > self.ns-1 :
na2 = self.ns-1
for i in range(na1,na2+1) :
val += self.wf[i] * ansc.lanczos( (t1 - float(i)/self.fs)*self.fs, lanczos_a )
return val
#------------------------------------------------------------
[docs] def setValue(self,i,val) :
"""Set the value of the sample *i*"""
if i >= 0 and i < self.getNs() :
self.getWf()[i] = val
#------------------------------------------------------------
[docs] def setValues(self,vals) :
"""Set the values of the waveform"""
for i in range(0,min(pl.shape(vals)[0],self.ns)) :
self.getWf()[i] = vals[i]
#------------------------------------------------------------
[docs] def subset(self,n1,n2) :
"""Get a new waveform with a subset of data [n1,n2)"""
if n2 < n1 :
return pl.nan
wf = WfDouble(n2-n1,self.fs,t0+n1/self.fs)
for i in range(n1,n2) :
wf.getWf()[i] = self.wf[i+n1]
return wf
#------------------------------------------------------------
[docs] def reset(self) :
"""Reset the data in the waveform"""
self.t0 = 0.
self.wf = pl.zeros(self.ns)
#------------------------------------------------------------
[docs] def copy(self,wf) :
"""Save a copy of data in another waveform"""
wf.setFs(self.fs)
wf.reset()
wf.setT0(self.t0)
for i in range(0,min(self.ns,wf.getNs())) :
wf.getWf()[i] = self.wf[i]
#------------------------------------------------------------
[docs] def clone(self) :
"""Make a new copy of the waveform"""
wf = WfDouble(self.ns,self.fs,self.t0)
for i in range(0,self.ns) :
wf.getWf()[i] = self.wf[i]
return wf
#------------------------------------------------------------
### Arithmetics ###
#------------------------------------------------------------
[docs] def add(self,wf) :
"""Add another waveform data"""
if self.isSimilar(wf) :
self.wf += self.wf + wf.getWf()
elif self.isCompatible(wf) :
i = 0
t = max(self.t0,wf.getT0())
# fix possible infinite loop!!!
while self.t0 + i/self.fs < t :
i += 1 # find the first sample
t = self.t0 + i/self.fs; # time of the first affected sample
while i < self.ns and t < wf.getNs()/wf.getFs()+wf.getT0() :
self.wf[i] += wf.getValue(t,self.WF_DEFAULT);
t += 1/self.fs
i += 1
# close if !!!
#------------------------------------------------------------
[docs] def subtract(self,wf) :
"""Subtract another waveform data"""
wf1 = clone(wf)
wf1.scale(-1.)
self.add(wf1)
#------------------------------------------------------------
[docs] def multiply(self,wf) :
"""Multiply by another waveform data"""
if self.isSimilar(wf) :
self.wf = self.wf * wf.getWf()
elif self.isCompatible(wf) :
i = 0;
t = max(self.t0,wf.getT0())
while self.t0 + i/self.fs < t :
i += 1 # find the first sample
t = self.t0 + i/self.fs; # time of the first affected sample
while i < self.ns and t < wf.getNs()/wf.getFs()+wf.getT0() :
self.wf[i] += wf.getValue(t,self.WF_DEFAULT)
t += 1/self.fs
i += 1
#------------------------------------------------------------
[docs] def divide(self,wf) :
"""Multiply by another waveform data"""
if self.isSimilar(wf) :
self.wf = self.wf/wf.getWf()
elif self.isCompatible(wf) :
i = 0;
t = max(self.t0,wf.getT0())
while self.t0 + i/self.fs < t :
i += 1 # find the first sample
t = self.t0 + i/self.fs; # time of the first affected sample
while i < self.ns and t < wf.getNs()/wf.getFs()+wf.getT0() :
self.wf[i] /= wf.getValue(t,self.WF_DEFAULT)
t += 1/self.fs
i += 1
#------------------------------------------------------------
### Some other operations and useful functions ###
#------------------------------------------------------------
[docs] def scale(self,s) :
"""Scale the waveform data"""
self.wf *= s
#------------------------------------------------------------
[docs] def addBias(self,bias) :
"""Add a bias (pedestal) to the waveform"""
self.wf += bias
#------------------------------------------------------------
[docs] def addTrend(self,trend) :
"""Add a trend to the waveform"""
for i in range(0,self.ns) :
self.wf[i] += trend*i/self.fs
#------------------------------------------------------------
[docs] def addSin(self,amp,freq,phi=0.) :
"""Add a sinusoidal signal"""
self.wf += amp*pl.sin(2.*pl.pi*freq*self.t+phi)
#------------------------------------------------------------
[docs] def addBurst(self,amp,freq,t1,tp) :
"""Add an RF burst"""
for i in range (0,self.ns) :
t = i/self.fs
if t >= t1 and t <= t1+tp :
self.wf[i] = amp*pl.sin(2*pl.pi*freq*(t-t1))
#------------------------------------------------------------
[docs] def addNoise(self,sigma) :
"""Add noise to the waveform"""
noise = np.random.normal(0.,sigma,self.ns)
self.wf += noise
#------------------------------------------------------------
### FFT ###
#------------------------------------------------------------
[docs] def doFft(self) :
"""Take an FFT of the waveform"""
self.fft = pl.fft(self.wf)
self.f = pl.fftfreq(self.ns,1./self.fs)
#------------------------------------------------------------
### Statistics ###
#------------------------------------------------------------
[docs] def getStat(self) :
"""Get waveform statistics"""
self.max = self.wf.max()
self.min = self.wf.min()
self.mean = self.wf.mean()
self.rms = pl.sqrt(self.wf.var())
#------------------------------------------------------------
### Output ###
#------------------------------------------------------------
[docs] def printOut(self) :
"""Print out the waveform parameters and data"""
if self.name != "" :
print self._name
print "ns = ",self.ns,"fs = ",self.fs,"t0 = ",self.t0
print self.wf
#------------------------------------------------------------
[docs] def plotWf(self) :
"""Plot the waveform"""
pl.plot(self.t,self.wf)
#------------------------------------------------------------
[docs] def plotSpectrum(self) :
"""Plot the spectrum of the waveform"""
self.doFft()
np = int(len(self.f)/2)
pl.plot(self.f[0:np],abs(self.fft[0:np]))
#------------------------------------------------------------
[docs] def plotFft(self) :
"""Plot the FFT of the waveform"""
self.doFft()
pl.plot(self.f,abs(self.fft))
#------------------------------------------------------------
"""
int wfDouble::castInt( wfInt *cast ) {
if ( !this->initialised() ) {
rfdspError( "Waveform is not initialised", __FILE__, __LINE__ );
return RFDSP_FAULT;
}
if ( !cast->initialised() ) {
rfdspError( "Argument waveform is not initialised", __FILE__, __LINE__ );
return RFDSP_FAULT;
}
if ( this->compatible( cast ) ) {
for ( int i = 0; i < MIN( _ns, cast->getNs() ); i++ ) cast->getWf()[i] = (int) round( _wf[i] );
} else {
rfdspWarning( "Using default interpolation mode", __FILE__, __LINE__ );
if ( this->castInt( cast, WF_DEFAULT ) ) return RFDSP_FAULT;
}
return RFDSP_SUCCESS;
}
//-----------------------------------------------------------------
int wfDouble::castInt( wfInt *cast, unsigned int mode ) {
if ( !this->initialised() ) {
rfdspError( "Waveform is not initialised", __FILE__, __LINE__ );
return RFDSP_FAULT;
}
if ( !cast->initialised() ) {
rfdspError( "Argument waveform is not initialised", __FILE__, __LINE__ );
return RFDSP_FAULT;
}
int i = 0;
double t = MAX( _t0, cast->getT0() );
while ( i/cast->getFs() < t ) i++;
t = cast->getT0() + i/cast->getFs();
while ( ( i < cast->getNs() ) && ( t < ns/fs + _t0 ) ) {
cast->getWf()[i] = (int) round( this->getValue( t, mode ) );
t += 1/cast->getFs();
i++;
}
return RFDSP_SUCCESS;
}
//---------------------------------------------------------------------
wfInt* wfDouble::castIntNew() {
wfInt *wf = new wfInt( _ns, _fs, _t0 );
if ( this->castInt( wf ) ) {
delete wf;
return NULL;
}
return wf;
}
"""