Thursday, August 18, 2016

NI DAQmx

Data Acquisition with the NI USB-6212

Data Acquisition with the NI USB-6212

There is a C API.

Help for the C API is installed in:

C:\Program Files (x86)\National Instruments\NI-DAQ\Docs\cdaqmx.chm

But the usage is best learned via the examples in:

C:\Users\public\Documents\National Instruments\NI-DAQ\Examples\DAQmx ANSI C

I worked with the PyDAQmx python wrapping. The wrapper's function names are the C ones without the DAQmx prefix. So the same help as for the ANSI C functions can be used. With this python wrapper the tests are a good starting point.

Here are some tests of mine.

Write to analogue out and read back in at the same time

Physically connect ao0 with ai0. The code writes a signal to ao0 and reads it back in via ai0. Dev2 is the device as shown in NI MAX (Measurement & Automation Explorer).

import ctypes
import numpy as np
import time
import matplotlib.pyplot as plt
from PyDAQmx import *

DEV = "Dev2"
f_s = 1000

tAI = Task()
tAI.CreateAIVoltageChan(DEV+"/ai0","Voltage",DAQmx_Val_RSE,-10,10,DAQmx_Val_Volts,None)
tAI.CfgSampClkTiming("",f_s,DAQmx_Val_Rising,DAQmx_Val_FiniteSamps,f_s)

written = int32()
dataO = np.array([sin(24.0*v/f_s)*f_s/v/24.0 for v in range(1,f_s+1)])
tAO = Task()
tAO.CreateAOVoltageChan(DEV+"/ao0","",-10.0,10.0,DAQmx_Val_Volts,None)
tAO.CfgSampClkTiming("",f_s,DAQmx_Val_Rising,DAQmx_Val_ContSamps,f_s)
tAO.WriteAnalogF64(f_s,0,10.0,DAQmx_Val_GroupByChannel,dataO,None,None)
tAO.StartTask()

dataI = np.zeros(f_s)
dataI = np.zeros(f_s)
readI = int32()
tAI.ReadAnalogF64(f_s,10.0,DAQmx_Val_GroupByChannel,dataI,f_s,byref(readI),None)
tAI.StartTask()
time.sleep(1)
tAI.StopTask()
tAI.ClearTask()

tAO.StopTask()
tAO.ClearTask()

plt.plot(dataI)
plt.show()

Reading more channels at the same time

I tried to create two tasks, each with an input channel. When starting them I got the error:

The specified resource is reserved.

Making the two channels part of the same task works.

import ctypes
import numpy as np
import time
import matplotlib.pyplot as plt
from PyDAQmx import *

DEV = "Dev2"
f_s = 1000
t = 1#s
V_MAX = 10
SHUNT = 100
I_MAX = 0.1
timeout = 10.0
Nch = 2

tsk = Task()
tsk.CreateAIVoltageChan(DEV+"/ai0","Voltage",DAQmx_Val_RSE,-V_MAX,V_MAX,DAQmx_Val_Volts,None)
# Taking the previous or the next line away makes the commented lines below work
tsk.CreateAICurrentChan(DEV+"/ai1","Current",DAQmx_Val_Cfg_Default,-I_MAX,I_MAX,DAQmx_Val_Amps,DAQmx_Val_Default,SHUNT,None) #

Ns = t*f_s
tsk.CfgSampClkTiming("",f_s,DAQmx_Val_Rising,DAQmx_Val_FiniteSamps,Ns)

# #A third output channel on a separate task does not work (1)
# written = int32()
# dataAO = np.array([sin(24.0*v/f_s)*f_s/v/24.0 for v in range(1,f_s+1)])
# tAO = Task()
# tAO.CreateAOVoltageChan(DEV+"/ao0","",-10.0,10.0,DAQmx_Val_Volts,None)
# tAO.CfgSampClkTiming("",f_s,DAQmx_Val_Rising,DAQmx_Val_ContSamps,f_s)
# tAO.WriteAnalogF64(f_s,0,10.0,DAQmx_Val_GroupByChannel,dataAO,None,None)
# tAO.StartTask()

read = int32()
data = np.zeros(Nch*Ns)
tsk.ReadAnalogF64(Ns,timeout,DAQmx_Val_GroupByChannel,data,Nch*Ns,byref(read),None)
tsk.StartTask()
time.sleep(1)

# #A third output channel on a separate task does not work (2)
#tAO.StopTask()
#tAO.ClearTask()

tsk.StopTask()

data.shape = (Ns,Nch)
plt.plot(data)
plt.show()

Put StartTask() after WriteAnalogF64(). ReadAnalogF64() will work properly also without calling StartTask(), but by calling it one keeps control of starting and stopping.

DAQmx_Val_GroupByChannel makes blocks by channel in the buffer. data.shape = (Ns,Nch) takes account of that.

I used the above samples (for t > 1) to verify that a frequency is there in a rather noise signal.

def has_frequency(data, f, f_s):
"""data is sampled over a multiple of the period, e.g. t = int(10/f+1)

The code uses heuristic parameters for a special context.
"""
    data = data.transpose()[0]
    N = data.size
    ssf = sps.medfilt(data,29) #filter first
    amplitudes = np.abs(np.fft.rfft(ssf))*2/N #then do an FFT
    f_idx = int(f*N/f_s)
    allthere = []
    #rectangular signal
    #check that in several modes the same frequency is present
    for m in range(1,4):
        Nwindow = 9
        rng = np.arange(-Nwindow,Nwindow,dtype=np.int)
        amps = amplitudes[m*f_idx+rng]
        imax_in_rng = m*f_idx + rng[np.argmax(amps)]
        there = abs(m*f_idx - imax_in_rng) <= 1 #allow for f tolerance
        allthere.append(there)
    return all(allthere)

Writing and reading from digital lines

StartTask() is not needed, as it is called automatically.

class digital_in_out:
    def __init__(self):
        self.tDO = Task()
        self.DO_names = "FootEnable,HandAbort,HandStart,DI,RC1,RC2,RC3,RC4".split(',')
        self.tDO.CreateDOChan(DEV+"/port0/line0:7",self.DO_names,DAQmx_Val_ChanForAllLines)
        self.do = np.array([0]*8,np.uint8)
        self.tDI = Task()
        self.DI_names = "DO1,DO2,Stim,Ready,Leadoff,Current,Power".split(',')
        self.tDI.CreateDIChan(DEV+"/port2/line0:6",,DAQmx_Val_ChanForAllLines)
        self.di = np.array([0]*7,np.uint8)
    def __end__(self):
        self.tDO.StopTask()
        self.tDO.ClearTask()
        self.tDI.StopTask()
        self.tDI.ClearTask()
    def set(self, what):
        for w in what:
            self.do[self.DO_names.index(w)] = 1
        self.tDO.WriteDigitalLines(1,1,10.0,DAQmx_Val_GroupByChannel,self.do,None,None);
    def clear(self, what):
        for w in what:
            self.do[self.DO_names.index(w)] = 0
        self.tDO.WriteDigitalLines(1,1,10.0,DAQmx_Val_GroupByChannel,self.do,None,None);
    def press(self, what):
        self.set(what)
        time.sleep(0.2)
        self.clear(what)
    def read(self, what):
        readDI = int32()
        sampleDI = int32()
        self.tDI.ReadDigitalLines(1,10.0,DAQmx_Val_GroupByChannel,self.di,len(self.DI_names),byref(sampleDI),byref(readDI),None)
        return int(self.di[self.DI_names.index(what)])

dio = digital_in_out()
dio.set("RC1 RC2".split())
dio.press(["HandStart"])
dio.read("Stim")
#...

No comments:

Post a Comment