Hi all, My lab is doing experiments involving sweeping instrument outputs and reading back data. This is currently done using Keysight's proprietary Labber software that uses instrument drivers written in python to configure connections. The driver files can be found here https://github.com/Labber-software/Drivers . If I can port the driver for the Agilent 5230 network analyzer (attached) I may be able to switch from Keysight to sigrok.
I'm hoping to avoid rewriting the existing drivers in C. Does sigrok have any support built in for drivers in python? Has anyone done a similar port that I can look at for reference? Thanks, Theo
#!/usr/bin/env python from VISA_Driver import VISA_Driver import numpy as np import os.path __version__ = "0.0.1" class Error(Exception): pass class Driver(VISA_Driver): """ This class implements the Agilent 5230 PNA driver""" def performOpen(self, options={}): """Perform the operation of opening the instrument connection""" # init meas param dict self.dMeasParam = {} # calling the generic VISA open to make sure we have a connection VISA_Driver.performOpen(self, options=options) # do perform get value for acquisition mode def performSetValue(self, quant, value, sweepRate=0.0, options={}): """Perform the Set Value instrument operation. This function should return the actual value set by the instrument""" if self.isFinalCall(options) and self.getValue('Sweep type') == 'Lorentzian': # get parameters centerFreq = self.getValue('Center frequency') qEst = self.getValue('Q Value') thetaMax = self.getValue('Maximum Angle') numPoints = self.getValue('# of points') # calculate distribution frequencies = self.calcLorentzianDistr(thetaMax, numPoints, qEst, centerFreq) data = [] for freq in frequencies: data.append('1') data.append('1') data.append(str(freq)) data.append(str(freq)) dataset = ','.join(data) self.writeAndLog('SENS:SEGM:LIST SSTOP, %s, %s' % (numPoints, dataset)) # update visa commands for triggers if quant.name in ('S11 - Enabled', 'S21 - Enabled', 'S12 - Enabled', 'S22 - Enabled'): if self.getModel() in ('E5071C',): # new trace handling, use trace numbers, set all at once lParam = ['S11', 'S21', 'S12', 'S22'] dParamValue = dict() for param in lParam: dParamValue[param] = self.getValue('%s - Enabled' % param) dParamValue[quant.name[:3]] = value # add parameters, if enabled self.dMeasParam = dict() for (param, enabled) in dParamValue.items(): if enabled: nParam = len(self.dMeasParam)+1 self.writeAndLog(":CALC:PAR%d:DEF %s" % (nParam, param)) self.dMeasParam[param] = nParam # set number of visible traces self.writeAndLog(":CALC:PAR:COUN %d" % len(self.dMeasParam)) else: # get updated list of measurements in use self.getActiveMeasurements() param = quant.name[:3] # old-type handling of traces if param in self.dMeasParam: # clear old measurements for this parameter for name in self.dMeasParam[param]: self.writeAndLog("CALC:PAR:DEL '%s'" % name) # create new measurement, if enabled is true if value: newName = 'LabC_%s' % param self.writeAndLog("CALC:PAR:EXT '%s','%s'" % (newName, param)) # show on PNA screen iTrace = 1 + ['S11', 'S21', 'S12', 'S22'].index(param) # sPrev = self.askAndLog('DISP:WIND:CAT?') # if sPrev.find('EMPTY')>0: # # no previous traces # iTrace = 1 # else: # # previous traces, add new # lTrace = sPrev[1:-1].split(',') # iTrace = int(lTrace[-1]) + 1 self.writeAndLog("DISP:WIND:TRAC%d:FEED '%s'" % (iTrace, newName)) # add to dict with list of measurements self.dMeasParam[param] = [newName] elif quant.name in ('Wait for new trace',): # do nothing pass elif quant.name in ('Range type',): # change range if single point if value == 'Single frequency': self.writeAndLog(':SENS:FREQ:SPAN 0') self.writeAndLog(':SENS:SWE:POIN 1') elif quant.name in ('Sweep type'): # if linear: if self.getValue('Sweep type') == 'Linear': self.writeAndLog(':SENS:SWE:TYPE LIN') #if log: elif self.getValue('Sweep type') == 'Log': self.writeAndLog(':SENS:SWE:TYPE LOG') # if Lorentzian: elif self.getValue('Sweep type') == 'Lorentzian': # prepare VNA for segment sweep self.writeAndLog(':SENS:SWE:TYPE SEGM') self.writeAndLog('DISP:WIND:TABL SEGM') else: # run standard VISA case value = VISA_Driver.performSetValue(self, quant, value, sweepRate, options) return value def performGetValue(self, quant, options={}): """Perform the Get Value instrument operation""" # check type of quantity if quant.name in ('S11 - Enabled', 'S21 - Enabled', 'S12 - Enabled', 'S22 - Enabled'): # update list of channels in use self.getActiveMeasurements() # get selected parameter param = quant.name[:3] value = (param in self.dMeasParam) elif quant.name in ('S11 - Value', 'S21 - Value', 'S12 - Value', 'S22 - Value'): # read trace, return averaged data data = self.readValueFromOther(quant.name[:3]) return np.mean(data['y']) elif quant.name in ('S11', 'S21', 'S12', 'S22'): # check if channel is on if quant.name not in self.dMeasParam: # get active measurements again, in case they changed self.getActiveMeasurements() if quant.name in self.dMeasParam: if self.getModel() in ('E5071C',): # new trace handling, use trace numbers self.writeAndLog("CALC:PAR%d:SEL" % self.dMeasParam[quant.name]) else: # old parameter handing, select parameter (use last in list) sName = self.dMeasParam[quant.name][-1] self.writeAndLog("CALC:PAR:SEL '%s'" % sName) # if not in continous mode, trig from computer bWaitTrace = self.getValue('Wait for new trace') bAverage = self.getValue('Average') # wait for trace, either in averaging or normal mode if bWaitTrace: if bAverage: # set channels 1-4 to set event when average complete (bit 1 start) self.writeAndLog(':SENS:AVER:CLE;:STAT:OPER:AVER1:ENAB 30;:ABOR;:SENS:AVER:CLE;') else: self.writeAndLog(':ABOR;:INIT:CONT OFF;:INIT:IMM;') self.writeAndLog('*OPC') # wait some time before first check self.wait(0.03) bDone = False while (not bDone) and (not self.isStopped()): # check if done if bAverage: sAverage = self.askAndLog('STAT:OPER:AVER1:COND?') bDone = int(sAverage)>0 else: stb = int(self.askAndLog('*ESR?')) bDone = (stb & 1) > 0 if not bDone: self.wait(0.1) # if stopped, don't get data if self.isStopped(): self.writeAndLog('*CLS;:INIT:CONT ON;') return [] # get data as float32, convert to numpy array if self.getModel() in ('E5071C',): # new trace handling, use trace numbers self.write(':FORM:DATA REAL32;:CALC:SEL:DATA:SDAT?', bCheckError=False) else: # old parameter handing self.write(':FORM REAL,32;CALC:DATA? SDATA', bCheckError=False) sData = self.read(ignore_termination=True) if bWaitTrace and not bAverage: self.writeAndLog(':INIT:CONT ON;') # strip header to find # of points i0 = sData.find(b'#') nDig = int(sData[i0+1:i0+2]) nByte = int(sData[i0+2:i0+2+nDig]) nData = int(nByte/4) nPts = int(nData/2) # get data to numpy array vData = np.frombuffer(sData[(i0+2+nDig):(i0+2+nDig+nByte)], dtype='>f', count=nData) # data is in I0,Q0,I1,Q1,I2,Q2,.. format, convert to complex mC = vData.reshape((nPts,2)) vComplex = mC[:,0] + 1j*mC[:,1] # get start/stop frequencies centerFreq = self.readValueFromOther('Center frequency') sweepType = self.readValueFromOther('Sweep type') # if log scale, take log of start/stop frequencies logX = (sweepType == 'Log') lorX = (sweepType == 'Lorentzian') if lorX: qEst = self.getValue('Q Value') thetaMax = self.getValue('Maximum Angle') numPoints = self.getValue('# of points') value = quant.getTraceDict(vComplex, x=self.calcLorentzianDistr(thetaMax, numPoints, qEst, centerFreq)) else: span = self.readValueFromOther('Span') startFreq = centerFreq - (span/2) stopFreq = centerFreq + (span/2) value = quant.getTraceDict(vComplex, x0=startFreq, x1=stopFreq, logX=logX) else: # not enabled, return empty array value = quant.getTraceDict([]) elif quant.name in ('Wait for new trace',): # do nothing, return local value value = quant.getValue() else: # for all other cases, call VISA driver value = VISA_Driver.performGetValue(self, quant, options) return value def getActiveMeasurements(self): """Retrieve and a list of measurement/parameters currently active""" # proceed depending on model if self.getModel() in ('E5071C',): # in this case, meas param is just a trace number self.dMeasParam = {} # get number or traces nTrace = int(self.askAndLog(":CALC:PAR:COUN?")) # get active trace names, one by one for n in range(nTrace): sParam = self.askAndLog(":CALC:PAR%d:DEF?" % (n+1)) self.dMeasParam[sParam] = (n+1) else: sAll = self.askAndLog("CALC:PAR:CAT:EXT?") # strip "-characters sAll = sAll[1:-1] # parse list, format is channel, parameter, ... self.dMeasParam = {} lAll = sAll.split(',') nMeas = len(lAll)//2 for n in range(nMeas): sName = lAll[2*n] sParam = lAll[2*n + 1] if sParam not in self.dMeasParam: # create list with current name self.dMeasParam[sParam] = [sName,] else: # add to existing list self.dMeasParam[sParam].append(sName) # helper function to calculate Lorentzian frequency distribution def calcLorentzianDistr(self, thetaMax, numPoints, qEst, centerFreq): theta = np.linspace(-thetaMax, thetaMax, numPoints) freq = np.multiply(centerFreq, (1 - np.multiply(1 / (2*qEst), np.tan(np.divide(theta, 2))))) return freq if __name__ == '__main__': pass
# Instrument driver configuration file. [General settings] # The name is shown in all the configuration windows name: Agilent Network Analyzer # The version string should be updated whenever changes are made to this config file version: 2.0 # Name of folder containing the code defining a custom driver. Do not define this item # or leave it blank for any standard driver based on the built-in VISA interface. driver_path: Agilent_NetworkAnalyzer # default interface and address interface: TCPIP [Model and options] # The option section allow instruments with different options to use the same driver # List of models supported by this driver model_str_1: N5230 model_str_2: E8364B model_str_3: E5071C model_str_4: E5063A model_str_5: N5232 model_str_6: N5222 model_str_7: N5231 # Check instrument model id at startup (True or False). Default is False check_model: True # Valid model strings returned by the instrument. Default value = model_str model_id_1: N5230 model_id_2: E8364B model_id_3: E5071C model_id_4: E5063A model_id_5: N5232 model_id_6: N5222 model_id_7: N5231 # General VISA settings for the instrument. [VISA settings] # Enable or disable communication over the VISA protocol (True or False) # If False, the driver will not perform any operations (unless there is a custom driver). use_visa = True # Reset the interface (not the instrument) at startup (True or False). Default is False reset: True # Time (in seconds) before the timing out while waiting for an instrument response. Default is 5 timeout: 10 # Query instrument errors (True or False). If True, every command sent to the device will # be followed by an error query. This is useful when testing new setups, but may degrade # performance by slowing down the instrument communication. query_instr_errors: False # Bit mask for checking status byte errors (default is 255, include all errors) # The bits signal the following errors: # 0: Operation # 1: Request control # 2: Query error # 3: Device error # 4: Execution error # 5: Command error # 6: User request # 7: Power on error_bit_mask: 255 # SCPI string to be used when querying for instrument error messages error_cmd: # Initialization commands are sent to the instrument when starting the driver # *RST will reset the device, *CLS clears the interface init: :INIT:CONT ON;:FORM:BORD NORM; # Boolean string values (used for sending True/False to instrument), default is 1 and 0 #str_true: ON #str_false: OFF # Final commands sent to the instrument when closing the driver final: # Define quantities in sections. This list is a selection of allowed keywords, # see the manual for a full list of options # datatype: The datatype should be one of DOUBLE, BOOLEAN, COMBO, # STRING, COMPLEX, VECTOR, VECTOR_COMPLEX, PATH or BUTTON. # unit: Quantity unit # set_cmd: Command used to send data to the instrument. Put <*> where the value should appear. # get_cmd: Command used to get the data from the instrument. Default is set_cmd? # def_value: Default value # low_lim: Lowest allowable value. Defaults to -INF # high_lim: Highest allowable values. Defaults to +INF # combo_def_1: First option in a pull-down combo box. Only used when datatype=COMBO # combo_def_2: Second option in a pull-down combo box. Only used when datatype=COMBO # ... # combo_def_n: nth option in a pull-down combo box. Only used when datatype=COMBO # state_quant: Quantity that determines this control's visibility # state_value_1: Value of "state_quant" for which the control is visible # state_value_2: Value of "state_quant" for which the control is visible # ... # state_value_n: Value of "state_quant" for which the control is visible # permission: Sets read/writability, options are BOTH, READ, WRITE or NONE. Default is BOTH # group: Name of the group where the control belongs. # section: Name of the section where the control belongs. [S11 - Enabled] datatype: BOOLEAN def_value: False group: Signals [S21 - Enabled] datatype: BOOLEAN def_value: False group: Signals [S12 - Enabled] datatype: BOOLEAN def_value: False group: Signals [S22 - Enabled] datatype: BOOLEAN def_value: False group: Signals [S11] x_name: Frequency x_unit: Hz datatype: VECTOR_COMPLEX permission: READ group: Signals [S21] x_name: Frequency x_unit: Hz datatype: VECTOR_COMPLEX permission: READ group: Signals [S12] x_name: Frequency x_unit: Hz datatype: VECTOR_COMPLEX permission: READ group: Signals [S22] x_name: Frequency x_unit: Hz datatype: VECTOR_COMPLEX permission: READ group: Signals [S11 - Value] datatype: COMPLEX unit: Hz permission: READ group: Signals [S21 - Value] datatype: COMPLEX unit: Hz permission: READ group: Signals [S12 - Value] datatype: COMPLEX unit: Hz permission: READ group: Signals [S22 - Value] datatype: COMPLEX unit: Hz permission: READ group: Signals [Output enabled] datatype: BOOLEAN def_value: False set_cmd: :OUTP group: Output [Output power] datatype: DOUBLE def_value: -30.0 unit: dBm set_cmd: :SOUR:POW group: Output [IF bandwidth] datatype: DOUBLE def_value: 10E3 unit: Hz set_cmd: :SENS:BWID group: Acquisition [Average] datatype: BOOLEAN def_value: False set_cmd: :SENS:AVER group: Acquisition [# of averages] datatype: DOUBLE def_value: 10 low_lim: 1 high_lim: 65536 set_cmd: :SENS:AVER:COUN group: Acquisition [Wait for new trace] datatype: BOOLEAN def_value: False group: Acquisition [Range type] datatype: COMBO def_value: Start - Stop combo_def_1: Center - Span combo_def_2: Start - Stop combo_def_3: Single frequency permission: WRITE group: Horizontal [Start frequency] datatype: DOUBLE def_value: 4E9 unit: Hz set_cmd: :SENS:FREQ:STAR state_quant: Range type state_value_1: Start - Stop group: Horizontal [Stop frequency] datatype: DOUBLE def_value: 12E9 unit: Hz set_cmd: :SENS:FREQ:STOP state_quant: Range type state_value_1: Start - Stop group: Horizontal [Center frequency] datatype: DOUBLE def_value: 8E9 unit: Hz set_cmd: :SENS:FREQ:CENT state_quant: Range type state_value_1: Center - Span state_value_2: Single frequency group: Horizontal [Span] datatype: DOUBLE def_value: 8E9 unit: Hz set_cmd: :SENS:FREQ:SPAN state_quant: Range type state_value_1: Center - Span group: Horizontal [# of points] datatype: DOUBLE def_value: 201 set_cmd: :SENS:SWE:POIN state_quant: Range type state_value_1: Center - Span state_value_2: Start - Stop group: Horizontal [Sweep type] datatype: COMBO def_value: Linear combo_def_1: Linear combo_def_2: Log combo_def_3: Lorentzian group: Horizontal [Q Value] datatype: DOUBLE def_value: 1.0 group: Horizontal state_quant: Sweep type state_value_1: Lorentzian [Maximum Angle] datatype: DOUBLE unit: rad def_value: 0.0 group: Horizontal state_quant: Sweep type state_value_1: Lorentzian [Sweep mode] datatype: COMBO def_value: Stepped combo_def_1: Stepped combo_def_2: Analog cmd_def_1: STEP cmd_def_2: ANAL set_cmd: :SENS:SWE:GEN group: Horizontal
_______________________________________________ sigrok-devel mailing list sigrok-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/sigrok-devel