Hi all,

My lab is doing experiments involving sweeping instrument outputs and reading 
back data. This is currently done using Keysight's proprietary Labber software 
that uses instrument drivers written in python to configure connections. The 
driver files can be found here https://github.com/Labber-software/Drivers . If 
I can port the driver for the Agilent 5230 network analyzer (attached) I may be 
able to switch from Keysight to sigrok.

I'm hoping to avoid rewriting the existing drivers in C. Does sigrok have any 
support built in for drivers in python? Has anyone done a similar port that I 
can look at for reference?

Thanks,

Theo

#!/usr/bin/env python

from VISA_Driver import VISA_Driver
import numpy as np
import os.path

__version__ = "0.0.1"

class Error(Exception):
    pass

class Driver(VISA_Driver):
    """ This class implements the Agilent 5230 PNA driver"""

    def performOpen(self, options={}):
        """Perform the operation of opening the instrument connection"""
        # init meas param dict
        self.dMeasParam = {}
        # calling the generic VISA open to make sure we have a connection
        VISA_Driver.performOpen(self, options=options)
        # do perform get value for acquisition mode


    def performSetValue(self, quant, value, sweepRate=0.0, options={}):
        """Perform the Set Value instrument operation. This function should
        return the actual value set by the instrument"""
        if self.isFinalCall(options) and self.getValue('Sweep type') == 
'Lorentzian':
                 # get parameters
                centerFreq = self.getValue('Center frequency')
                qEst = self.getValue('Q Value')
                thetaMax = self.getValue('Maximum Angle')
                numPoints = self.getValue('# of points')
                # calculate distribution
                frequencies = self.calcLorentzianDistr(thetaMax, numPoints, 
qEst, centerFreq)
                data = []
                for freq in frequencies:
                    data.append('1')
                    data.append('1')
                    data.append(str(freq))
                    data.append(str(freq))
                dataset = ','.join(data)
                self.writeAndLog('SENS:SEGM:LIST SSTOP, %s, %s' % (numPoints, 
dataset))
        # update visa commands for triggers
        if quant.name in ('S11 - Enabled', 'S21 - Enabled', 'S12 - Enabled',
                          'S22 - Enabled'):
            if self.getModel() in ('E5071C',):
                # new trace handling, use trace numbers, set all at once
                lParam = ['S11', 'S21', 'S12', 'S22']
                dParamValue = dict()
                for param in lParam:
                    dParamValue[param] = self.getValue('%s - Enabled' % param)
                dParamValue[quant.name[:3]] = value
                # add parameters, if enabled
                self.dMeasParam = dict()
                for (param, enabled) in dParamValue.items():
                    if enabled:
                        nParam = len(self.dMeasParam)+1
                        self.writeAndLog(":CALC:PAR%d:DEF %s" %
                                         (nParam, param))
                        self.dMeasParam[param] = nParam
                # set number of visible traces
                self.writeAndLog(":CALC:PAR:COUN %d" % len(self.dMeasParam))
            else:
                # get updated list of measurements in use
                self.getActiveMeasurements()
                param = quant.name[:3]
                # old-type handling of traces
                if param in self.dMeasParam:
                    # clear old measurements for this parameter
                    for name in self.dMeasParam[param]:
                        self.writeAndLog("CALC:PAR:DEL '%s'" % name)
                # create new measurement, if enabled is true
                if value:
                    newName = 'LabC_%s' % param
                    self.writeAndLog("CALC:PAR:EXT '%s','%s'" % (newName, 
param))
                    # show on PNA screen
                    iTrace = 1 + ['S11', 'S21', 'S12', 'S22'].index(param)
    #                sPrev = self.askAndLog('DISP:WIND:CAT?')
    #                if sPrev.find('EMPTY')>0:
    #                    # no previous traces
    #                    iTrace = 1
    #                else:
    #                    # previous traces, add new
    #                    lTrace = sPrev[1:-1].split(',')
    #                    iTrace = int(lTrace[-1]) + 1
                    self.writeAndLog("DISP:WIND:TRAC%d:FEED '%s'" % (iTrace, 
newName))
                    # add to dict with list of measurements
                    self.dMeasParam[param] = [newName]
        elif quant.name in ('Wait for new trace',):
            # do nothing
            pass
        elif quant.name in ('Range type',):
            # change range if single point
            if value == 'Single frequency':
                self.writeAndLog(':SENS:FREQ:SPAN 0')
                self.writeAndLog(':SENS:SWE:POIN 1')

        elif quant.name in ('Sweep type'):
            # if linear:
            if self.getValue('Sweep type') == 'Linear':
                self.writeAndLog(':SENS:SWE:TYPE LIN')
            #if log:
            elif self.getValue('Sweep type') == 'Log':
                self.writeAndLog(':SENS:SWE:TYPE LOG')
            # if Lorentzian:
            elif self.getValue('Sweep type') == 'Lorentzian':
                # prepare VNA for segment sweep
                self.writeAndLog(':SENS:SWE:TYPE SEGM') 
                self.writeAndLog('DISP:WIND:TABL SEGM') 
        else:
            # run standard VISA case 
            value = VISA_Driver.performSetValue(self, quant, value, sweepRate, 
options)
        return value


    def performGetValue(self, quant, options={}):
        """Perform the Get Value instrument operation"""
        # check type of quantity
        if quant.name in ('S11 - Enabled', 'S21 - Enabled', 'S12 - Enabled',
                          'S22 - Enabled'):
            # update list of channels in use
            self.getActiveMeasurements()
            # get selected parameter
            param = quant.name[:3]
            value = (param in self.dMeasParam)
        elif quant.name in ('S11 - Value', 'S21 - Value', 'S12 - Value', 'S22 - 
Value'):
            # read trace, return averaged data
            data = self.readValueFromOther(quant.name[:3])
            return np.mean(data['y'])
        elif quant.name in ('S11', 'S21', 'S12', 'S22'):
            # check if channel is on
            if quant.name not in self.dMeasParam:
                # get active measurements again, in case they changed
                self.getActiveMeasurements()
            if quant.name in self.dMeasParam:
                if self.getModel() in ('E5071C',):
                    # new trace handling, use trace numbers
                    self.writeAndLog("CALC:PAR%d:SEL" % 
self.dMeasParam[quant.name])
                else:
                    # old parameter handing, select parameter (use last in list)
                    sName = self.dMeasParam[quant.name][-1]
                    self.writeAndLog("CALC:PAR:SEL '%s'" % sName)
                # if not in continous mode, trig from computer
                bWaitTrace = self.getValue('Wait for new trace')
                bAverage = self.getValue('Average')
                # wait for trace, either in averaging or normal mode
                if bWaitTrace:
                    if bAverage:
                        # set channels 1-4 to set event when average complete 
(bit 1 start)
                        self.writeAndLog(':SENS:AVER:CLE;:STAT:OPER:AVER1:ENAB 
30;:ABOR;:SENS:AVER:CLE;')
                    else:
                        self.writeAndLog(':ABOR;:INIT:CONT OFF;:INIT:IMM;')
                        self.writeAndLog('*OPC') 
                    # wait some time before first check
                    self.wait(0.03)
                    bDone = False
                    while (not bDone) and (not self.isStopped()):
                        # check if done
                        if bAverage:
                            sAverage = self.askAndLog('STAT:OPER:AVER1:COND?')
                            bDone = int(sAverage)>0
                        else:
                            stb = int(self.askAndLog('*ESR?'))
                            bDone = (stb & 1) > 0
                        if not bDone:
                            self.wait(0.1)
                    # if stopped, don't get data
                    if self.isStopped():
                        self.writeAndLog('*CLS;:INIT:CONT ON;')
                        return []
                # get data as float32, convert to numpy array
                if self.getModel() in ('E5071C',):
                    # new trace handling, use trace numbers
                    self.write(':FORM:DATA REAL32;:CALC:SEL:DATA:SDAT?', 
bCheckError=False)
                else:
                    # old parameter handing
                    self.write(':FORM REAL,32;CALC:DATA? SDATA', 
bCheckError=False)
                sData = self.read(ignore_termination=True)
                if bWaitTrace and not bAverage:
                    self.writeAndLog(':INIT:CONT ON;')
                # strip header to find # of points
                i0 = sData.find(b'#')
                nDig = int(sData[i0+1:i0+2])
                nByte = int(sData[i0+2:i0+2+nDig])
                nData = int(nByte/4)
                nPts = int(nData/2)
                # get data to numpy array
                vData = np.frombuffer(sData[(i0+2+nDig):(i0+2+nDig+nByte)], 
                                      dtype='>f', count=nData)
                # data is in I0,Q0,I1,Q1,I2,Q2,.. format, convert to complex
                mC = vData.reshape((nPts,2))
                vComplex = mC[:,0] + 1j*mC[:,1]
                # get start/stop frequencies
                centerFreq = self.readValueFromOther('Center frequency')
                sweepType = self.readValueFromOther('Sweep type')
                # if log scale, take log of start/stop frequencies
                logX = (sweepType == 'Log')
                lorX = (sweepType == 'Lorentzian')
                if lorX:
                    qEst = self.getValue('Q Value')
                    thetaMax = self.getValue('Maximum Angle')
                    numPoints = self.getValue('# of points')
                    value = quant.getTraceDict(vComplex, 
x=self.calcLorentzianDistr(thetaMax, numPoints, qEst, centerFreq))
                else:
                    span = self.readValueFromOther('Span')
                    startFreq = centerFreq - (span/2)
                    stopFreq = centerFreq + (span/2)
                    value = quant.getTraceDict(vComplex, x0=startFreq, 
x1=stopFreq,
                                               logX=logX)
            else:
                # not enabled, return empty array
                value = quant.getTraceDict([])
        elif quant.name in ('Wait for new trace',):
            # do nothing, return local value
            value = quant.getValue()
        else:
            # for all other cases, call VISA driver
            value = VISA_Driver.performGetValue(self, quant, options)
        return value
        

    def getActiveMeasurements(self):
        """Retrieve and a list of measurement/parameters currently active"""
        # proceed depending on model
        if self.getModel() in ('E5071C',):
            # in this case, meas param is just a trace number
            self.dMeasParam = {}
            # get number or traces
            nTrace = int(self.askAndLog(":CALC:PAR:COUN?"))
            # get active trace names, one by one
            for n in range(nTrace):
                sParam = self.askAndLog(":CALC:PAR%d:DEF?" % (n+1))
                self.dMeasParam[sParam] = (n+1)
        else:
            sAll = self.askAndLog("CALC:PAR:CAT:EXT?")
            # strip "-characters
            sAll = sAll[1:-1]
            # parse list, format is channel, parameter, ...
            self.dMeasParam = {}
            lAll = sAll.split(',')
            nMeas = len(lAll)//2
            for n in range(nMeas):
                sName = lAll[2*n] 
                sParam = lAll[2*n + 1]
                if sParam not in self.dMeasParam:
                    # create list with current name
                    self.dMeasParam[sParam] = [sName,]
                else:
                    # add to existing list
                    self.dMeasParam[sParam].append(sName)
    
    # helper function to calculate Lorentzian frequency distribution 
    def calcLorentzianDistr(self, thetaMax, numPoints, qEst, centerFreq):
        theta = np.linspace(-thetaMax, thetaMax, numPoints)
        freq = np.multiply(centerFreq, (1 - np.multiply(1 / (2*qEst), 
np.tan(np.divide(theta, 2)))))
        return freq
    


if __name__ == '__main__':
    pass
# Instrument driver configuration file.

[General settings]

# The name is shown in all the configuration windows
name: Agilent Network Analyzer

# The version string should be updated whenever changes are made to this config 
file
version: 2.0

# Name of folder containing the code defining a custom driver. Do not define 
this item
# or leave it blank for any standard driver based on the built-in VISA 
interface.
driver_path: Agilent_NetworkAnalyzer

# default interface and address
interface: TCPIP


[Model and options]
# The option section allow instruments with different options to use the same 
driver

# List of models supported by this driver
model_str_1: N5230
model_str_2: E8364B
model_str_3: E5071C
model_str_4: E5063A
model_str_5: N5232
model_str_6: N5222
model_str_7: N5231

# Check instrument model id at startup (True or False). Default is False
check_model: True

# Valid model strings returned by the instrument. Default value = model_str
model_id_1: N5230
model_id_2: E8364B
model_id_3: E5071C
model_id_4: E5063A
model_id_5: N5232
model_id_6: N5222
model_id_7: N5231



# General VISA settings for the instrument.
[VISA settings]

# Enable or disable communication over the VISA protocol (True or False)
# If False, the driver will not perform any operations (unless there is a 
custom driver).
use_visa = True

# Reset the interface (not the instrument) at startup (True or False).  Default 
is False
reset: True

# Time (in seconds) before the timing out while waiting for an instrument 
response. Default is 5
timeout: 10

# Query instrument errors (True or False).  If True, every command sent to the 
device will
# be followed by an error query.  This is useful when testing new setups, but 
may degrade
# performance by slowing down the instrument communication. 
query_instr_errors: False 

# Bit mask for checking status byte errors (default is 255, include all errors)
# The bits signal the following errors:
# 0: Operation
# 1: Request control
# 2: Query error
# 3: Device error
# 4: Execution error
# 5: Command error
# 6: User request
# 7: Power on
error_bit_mask: 255

# SCPI string to be used when querying for instrument error messages
error_cmd: 

# Initialization commands are sent to the instrument when starting the driver
# *RST will reset the device, *CLS clears the interface
init: :INIT:CONT ON;:FORM:BORD NORM;

# Boolean string values (used for sending True/False to instrument), default is 
1 and 0
#str_true: ON
#str_false: OFF

# Final commands sent to the instrument when closing the driver
final: 


# Define quantities in sections. This list is a selection of allowed keywords,
# see the manual for a full list of options
#   datatype:      The datatype should be one of DOUBLE, BOOLEAN, COMBO,
#                  STRING, COMPLEX, VECTOR, VECTOR_COMPLEX, PATH or BUTTON.
#   unit:          Quantity unit
#   set_cmd:       Command used to send data to the instrument. Put <*> where 
the value should appear.
#   get_cmd:       Command used to get the data from the instrument. Default is 
set_cmd?
#   def_value:     Default value
#   low_lim:       Lowest allowable value.  Defaults to -INF
#   high_lim:      Highest allowable values.  Defaults to +INF
#   combo_def_1:   First option in a pull-down combo box. Only used when 
datatype=COMBO
#   combo_def_2:   Second option in a pull-down combo box. Only used when 
datatype=COMBO
#   ...
#   combo_def_n:   nth option in a pull-down combo box. Only used when 
datatype=COMBO
#   state_quant:   Quantity that determines this control's visibility
#   state_value_1: Value of "state_quant" for which the control is visible
#   state_value_2: Value of "state_quant" for which the control is visible
#   ...
#   state_value_n: Value of "state_quant" for which the control is visible
#   permission:    Sets read/writability, options are BOTH, READ, WRITE or 
NONE. Default is BOTH 
#   group:         Name of the group where the control belongs.
#   section:       Name of the section where the control belongs.


[S11 - Enabled]
datatype: BOOLEAN
def_value: False
group: Signals

[S21 - Enabled]
datatype: BOOLEAN
def_value: False
group: Signals

[S12 - Enabled]
datatype: BOOLEAN
def_value: False
group: Signals

[S22 - Enabled]
datatype: BOOLEAN
def_value: False
group: Signals

[S11]
x_name: Frequency
x_unit: Hz
datatype: VECTOR_COMPLEX
permission: READ
group: Signals

[S21]
x_name: Frequency
x_unit: Hz
datatype: VECTOR_COMPLEX
permission: READ
group: Signals

[S12]
x_name: Frequency
x_unit: Hz
datatype: VECTOR_COMPLEX
permission: READ
group: Signals

[S22]
x_name: Frequency
x_unit: Hz
datatype: VECTOR_COMPLEX
permission: READ
group: Signals

[S11 - Value]
datatype: COMPLEX
unit: Hz
permission: READ
group: Signals

[S21 - Value]
datatype: COMPLEX
unit: Hz
permission: READ
group: Signals

[S12 - Value]
datatype: COMPLEX
unit: Hz
permission: READ
group: Signals

[S22 - Value]
datatype: COMPLEX
unit: Hz
permission: READ
group: Signals

[Output enabled]
datatype: BOOLEAN
def_value: False
set_cmd: :OUTP
group: Output

[Output power]
datatype: DOUBLE
def_value: -30.0
unit: dBm
set_cmd: :SOUR:POW
group: Output

[IF bandwidth]
datatype: DOUBLE
def_value: 10E3
unit: Hz
set_cmd: :SENS:BWID
group: Acquisition

[Average]
datatype: BOOLEAN
def_value: False
set_cmd: :SENS:AVER
group: Acquisition

[# of averages]
datatype: DOUBLE
def_value: 10
low_lim: 1
high_lim: 65536
set_cmd: :SENS:AVER:COUN
group: Acquisition

[Wait for new trace]
datatype: BOOLEAN
def_value: False
group: Acquisition

[Range type]
datatype: COMBO
def_value: Start - Stop
combo_def_1: Center - Span
combo_def_2: Start - Stop
combo_def_3: Single frequency
permission: WRITE
group: Horizontal

[Start frequency]
datatype: DOUBLE
def_value: 4E9
unit: Hz
set_cmd: :SENS:FREQ:STAR
state_quant: Range type
state_value_1: Start - Stop
group: Horizontal

[Stop frequency]
datatype: DOUBLE
def_value: 12E9
unit: Hz
set_cmd: :SENS:FREQ:STOP
state_quant: Range type
state_value_1: Start - Stop
group: Horizontal

[Center frequency]
datatype: DOUBLE
def_value: 8E9
unit: Hz
set_cmd: :SENS:FREQ:CENT
state_quant: Range type
state_value_1: Center - Span
state_value_2: Single frequency
group: Horizontal

[Span]
datatype: DOUBLE
def_value: 8E9
unit: Hz
set_cmd: :SENS:FREQ:SPAN
state_quant: Range type
state_value_1: Center - Span
group: Horizontal

[# of points]
datatype: DOUBLE
def_value: 201
set_cmd: :SENS:SWE:POIN
state_quant: Range type
state_value_1: Center - Span
state_value_2: Start - Stop
group: Horizontal

[Sweep type]
datatype: COMBO
def_value: Linear
combo_def_1: Linear
combo_def_2: Log
combo_def_3: Lorentzian
group: Horizontal

[Q Value]
datatype: DOUBLE
def_value: 1.0
group: Horizontal
state_quant: Sweep type
state_value_1: Lorentzian

[Maximum Angle]
datatype: DOUBLE
unit: rad
def_value: 0.0
group: Horizontal
state_quant: Sweep type
state_value_1: Lorentzian

[Sweep mode]
datatype: COMBO
def_value: Stepped
combo_def_1: Stepped
combo_def_2: Analog
cmd_def_1: STEP
cmd_def_2: ANAL
set_cmd: :SENS:SWE:GEN
group: Horizontal

_______________________________________________
sigrok-devel mailing list
sigrok-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/sigrok-devel

Reply via email to