Data Acquisition with the NI USB-6212
There is a C API.
Help for the C API is installed in:
C:\Program Files (x86)\National Instruments\NI-DAQ\Docs\cdaqmx.chm
But the usage is best learned via the examples in:
C:\Users\public\Documents\National Instruments\NI-DAQ\Examples\DAQmx ANSI C
I worked with the PyDAQmx python wrapping. The wrapper's function names are the C ones without the DAQmx prefix. So the same help as for the ANSI C functions can be used. With this python wrapper the tests are a good starting point.
Here are some tests of mine.
Write to analogue out and read back in at the same time
Physically connect ao0 with ai0. The code writes a signal to ao0 and reads it back in via ai0. Dev2 is the device as shown in NI MAX (Measurement & Automation Explorer).
import ctypes import numpy as np import time import matplotlib.pyplot as plt from PyDAQmx import * DEV = "Dev2" f_s = 1000 tAI = Task() tAI.CreateAIVoltageChan(DEV+"/ai0","Voltage",DAQmx_Val_RSE,-10,10,DAQmx_Val_Volts,None) tAI.CfgSampClkTiming("",f_s,DAQmx_Val_Rising,DAQmx_Val_FiniteSamps,f_s) written = int32() dataO = np.array([sin(24.0*v/f_s)*f_s/v/24.0 for v in range(1,f_s+1)]) tAO = Task() tAO.CreateAOVoltageChan(DEV+"/ao0","",-10.0,10.0,DAQmx_Val_Volts,None) tAO.CfgSampClkTiming("",f_s,DAQmx_Val_Rising,DAQmx_Val_ContSamps,f_s) tAO.WriteAnalogF64(f_s,0,10.0,DAQmx_Val_GroupByChannel,dataO,None,None) tAO.StartTask() dataI = np.zeros(f_s) dataI = np.zeros(f_s) readI = int32() tAI.ReadAnalogF64(f_s,10.0,DAQmx_Val_GroupByChannel,dataI,f_s,byref(readI),None) tAI.StartTask() time.sleep(1) tAI.StopTask() tAI.ClearTask() tAO.StopTask() tAO.ClearTask() plt.plot(dataI) plt.show()
Reading more channels at the same time
I tried to create two tasks, each with an input channel. When starting them I got the error:
The specified resource is reserved.
Making the two channels part of the same task works.
import ctypes import numpy as np import time import matplotlib.pyplot as plt from PyDAQmx import * DEV = "Dev2" f_s = 1000 t = 1#s V_MAX = 10 SHUNT = 100 I_MAX = 0.1 timeout = 10.0 Nch = 2 tsk = Task() tsk.CreateAIVoltageChan(DEV+"/ai0","Voltage",DAQmx_Val_RSE,-V_MAX,V_MAX,DAQmx_Val_Volts,None) # Taking the previous or the next line away makes the commented lines below work tsk.CreateAICurrentChan(DEV+"/ai1","Current",DAQmx_Val_Cfg_Default,-I_MAX,I_MAX,DAQmx_Val_Amps,DAQmx_Val_Default,SHUNT,None) # Ns = t*f_s tsk.CfgSampClkTiming("",f_s,DAQmx_Val_Rising,DAQmx_Val_FiniteSamps,Ns) # #A third output channel on a separate task does not work (1) # written = int32() # dataAO = np.array([sin(24.0*v/f_s)*f_s/v/24.0 for v in range(1,f_s+1)]) # tAO = Task() # tAO.CreateAOVoltageChan(DEV+"/ao0","",-10.0,10.0,DAQmx_Val_Volts,None) # tAO.CfgSampClkTiming("",f_s,DAQmx_Val_Rising,DAQmx_Val_ContSamps,f_s) # tAO.WriteAnalogF64(f_s,0,10.0,DAQmx_Val_GroupByChannel,dataAO,None,None) # tAO.StartTask() read = int32() data = np.zeros(Nch*Ns) tsk.ReadAnalogF64(Ns,timeout,DAQmx_Val_GroupByChannel,data,Nch*Ns,byref(read),None) tsk.StartTask() time.sleep(1) # #A third output channel on a separate task does not work (2) #tAO.StopTask() #tAO.ClearTask() tsk.StopTask() data.shape = (Ns,Nch) plt.plot(data) plt.show()
Put StartTask() after WriteAnalogF64(). ReadAnalogF64() will work properly also without calling StartTask(), but by calling it one keeps control of starting and stopping.
DAQmx_Val_GroupByChannel makes blocks by channel in the buffer. data.shape = (Ns,Nch) takes account of that.
I used the above samples (for t > 1) to verify that a frequency is there in a rather noise signal.
def has_frequency(data, f, f_s): """data is sampled over a multiple of the period, e.g. t = int(10/f+1) The code uses heuristic parameters for a special context. """ data = data.transpose()[0] N = data.size ssf = sps.medfilt(data,29) #filter first amplitudes = np.abs(np.fft.rfft(ssf))*2/N #then do an FFT f_idx = int(f*N/f_s) allthere = [] #rectangular signal #check that in several modes the same frequency is present for m in range(1,4): Nwindow = 9 rng = np.arange(-Nwindow,Nwindow,dtype=np.int) amps = amplitudes[m*f_idx+rng] imax_in_rng = m*f_idx + rng[np.argmax(amps)] there = abs(m*f_idx - imax_in_rng) <= 1 #allow for f tolerance allthere.append(there) return all(allthere)
Writing and reading from digital lines
StartTask() is not needed, as it is called automatically.
class digital_in_out: def __init__(self): self.tDO = Task() self.DO_names = "FootEnable,HandAbort,HandStart,DI,RC1,RC2,RC3,RC4".split(',') self.tDO.CreateDOChan(DEV+"/port0/line0:7",self.DO_names,DAQmx_Val_ChanForAllLines) self.do = np.array([0]*8,np.uint8) self.tDI = Task() self.DI_names = "DO1,DO2,Stim,Ready,Leadoff,Current,Power".split(',') self.tDI.CreateDIChan(DEV+"/port2/line0:6",,DAQmx_Val_ChanForAllLines) self.di = np.array([0]*7,np.uint8) def __end__(self): self.tDO.StopTask() self.tDO.ClearTask() self.tDI.StopTask() self.tDI.ClearTask() def set(self, what): for w in what: self.do[self.DO_names.index(w)] = 1 self.tDO.WriteDigitalLines(1,1,10.0,DAQmx_Val_GroupByChannel,self.do,None,None); def clear(self, what): for w in what: self.do[self.DO_names.index(w)] = 0 self.tDO.WriteDigitalLines(1,1,10.0,DAQmx_Val_GroupByChannel,self.do,None,None); def press(self, what): self.set(what) time.sleep(0.2) self.clear(what) def read(self, what): readDI = int32() sampleDI = int32() self.tDI.ReadDigitalLines(1,10.0,DAQmx_Val_GroupByChannel,self.di,len(self.DI_names),byref(sampleDI),byref(readDI),None) return int(self.di[self.DI_names.index(what)]) dio = digital_in_out() dio.set("RC1 RC2".split()) dio.press(["HandStart"]) dio.read("Stim") #...
No comments:
Post a Comment