Dear,
I am revisiting an issue I encountered earlier. I have a flow that
generates pulses with spaces between them continuously. To switch between
the RX and TX paths on a single antenna, I tried utilizing GPIO to control
an RF switch.
Tests were conducted on a B210 and X310. Initially, I used GPIO tags on a
USRP sink, but this caused continuous underruns. A sample rate of 4 MHz was
used. I then switched to using bursts in combination with ATR, but the
issue persists. The USRP is unable to keep up, resulting in significant
underruns.
My goal is to send pulses at specific times and have the GPIO state follow
accordingly. When sending 0s or no samples, the GPIO state should be low;
otherwise, it should be high.
Could you advise on how to achieve this? I have attached my embedded Python
block code, which is positioned just before the USRP sink block. This block
adds the tags for the start and end of bursts and handles the initial GPIO
setup. For reference, I have also included the manual GPIO control block.
Thank you for your assistance.
Best regards,
Tim Vancauwenbergh
import numpy as np
from gnuradio import gr
import pmt
class gpio_toggler(gr.sync_block):
def __init__(self, threshold=0.5):
# Initialize the block
super().__init__(
name='GPIO Toggler',
in_sig=[np.float32],
out_sig=[np.float32]
)
self.threshold = threshold
self.last_state_above_threshold = False # Track the previous state
# GPIO configuration
self.gpio_mask = 0xFFF # Mask for 12 GPIO pins
self.gpio_high = 0xFFF # Set all GPIO pins to HIGH
self.gpio_low = 0x000 # Set all GPIO pins to LOW
self.key = pmt.intern('tx_command') # Key for the tx_command tag
self.srcid = pmt.intern('gpio_toggler') # Source ID for tagging
# Flag to check if GPIO has been initialized
self.gpio_initialized = False
# Create initial commands for setting up GPIO
self.gpio_ddr_command = self.create_gpio_command("FP0A", "DDR", self.gpio_mask)
self.gpio_ctrl_command = self.create_gpio_command("FP0A", "CTRL", self.gpio_low)
def create_gpio_command(self, bank, attr, value):
"""Create a PMT dictionary for the GPIO command."""
gpio_command = pmt.make_dict()
gpio_command = pmt.dict_add(gpio_command, pmt.intern("bank"), pmt.intern(bank))
gpio_command = pmt.dict_add(gpio_command, pmt.intern("attr"), pmt.intern(attr))
gpio_command = pmt.dict_add(gpio_command, pmt.intern("value"), pmt.from_double(value))
gpio_command = pmt.dict_add(gpio_command, pmt.intern("mask"), pmt.from_double(self.gpio_mask))
return gpio_command
def work(self, input_items, output_items):
in0 = input_items[0]
out0 = output_items[0]
# Pass-through block
out0[:] = in0
for i in range(len(in0)):
sample = in0[i]
current_state_above_threshold = sample > self.threshold
# Send GPIO setup commands on the first iteration
if not self.gpio_initialized:
# Tag GPIO setup command for DDR
tx_command_ddr = pmt.make_dict()
tx_command_ddr = pmt.dict_add(tx_command_ddr, pmt.intern("gpio"), self.gpio_ddr_command)
abs_offset_ddr = max(self.nitems_written(0) + i - 1, 0)
self.add_item_tag(0, abs_offset_ddr, self.key, tx_command_ddr, self.srcid)
# Tag GPIO setup command for CTRL
tx_command_ctrl = pmt.make_dict()
tx_command_ctrl = pmt.dict_add(tx_command_ctrl, pmt.intern("gpio"), self.gpio_ctrl_command)
abs_offset_ctrl = max(self.nitems_written(0) + i, 0)
self.add_item_tag(0, abs_offset_ctrl, self.key, tx_command_ctrl, self.srcid)
# Set GPIO initialized flag to True to prevent re-sending
self.gpio_initialized = True
# Check for threshold crossing from low to high (ON command)
if current_state_above_threshold and not self.last_state_above_threshold:
# Create GPIO ON command
gpio_on_command = self.create_gpio_command("FP0A", "OUT", self.gpio_high)
# Create tx_command dictionary with gpio as its value
tx_command = pmt.make_dict()
tx_command = pmt.dict_add(tx_command, pmt.intern("gpio"), gpio_on_command)
# Tag the ON command one sample before the current index
abs_offset = max(self.nitems_written(0) + i - 1, 0)
self.add_item_tag(0, abs_offset, self.key, tx_command, self.srcid)
# Check for threshold crossing from high to low (OFF command)
elif not current_state_above_threshold and self.last_state_above_threshold:
# Create GPIO OFF command
gpio_off_command = self.create_gpio_command("FP0A", "OUT", self.gpio_low)
# Create tx_command dictionary with gpio as its value
tx_command = pmt.make_dict()
tx_command = pmt.dict_add(tx_command, pmt.intern("gpio"), gpio_off_command)
# Tag the OFF command at the current index
abs_offset = self.nitems_written(0) + i
self.add_item_tag(0, abs_offset, self.key, tx_command, self.srcid)
# Update last state for next iteration
self.last_state_above_threshold = current_state_above_threshold
return len(output_items[0])
import numpy as np
from gnuradio import gr
import pmt
import time
class gpio_toggler(gr.sync_block):
def __init__(self, threshold=0.5, channel_type=0, sample_rate=4e6):
super().__init__(
name='GPIO Tagger',
in_sig=[np.float32],
out_sig=[np.float32]
)
self.threshold = threshold
self.last_state_above_threshold = False
if channel_type == 0:
self.dead_time_us = (8/2) + 12
else:
self.dead_time_us = (8/2) + 36
self.dead_time_samples = int((self.dead_time_us / 1e6) * sample_rate)
self.samples_since_last_tag = self.dead_time_samples
self.gpio_key = pmt.intern('tx_command')
self.srcid = pmt.intern('gpio_toggler')
# Precompute GPIO setup commands to avoid repeated PMT dict creation
self.gpio_initialized = False
self.gpio_ddr_command = self.create_gpio_command("FP0", "DDR", 0x001, 0xFFF)
self.gpio_ctrl_command = self.create_gpio_command("FP0", "CTRL", 0x001, 0xFFF)
self.gpio_atr_command = self.create_gpio_command("FP0", "ATR_0X", 0x000, 0xFFF)
def create_gpio_command(self, bank, attr, value, mask):
gpio_command = pmt.make_dict()
gpio_command = pmt.dict_add(gpio_command, pmt.intern("bank"), pmt.intern(bank))
gpio_command = pmt.dict_add(gpio_command, pmt.intern("attr"), pmt.intern(attr))
gpio_command = pmt.dict_add(gpio_command, pmt.intern("value"), pmt.from_double(value))
gpio_command = pmt.dict_add(gpio_command, pmt.intern("mask"), pmt.from_double(mask))
return gpio_command
def work(self, input_items, output_items):
in0 = input_items[0]
out0 = output_items[0]
out0[:] = in0
# Local copies to reduce attribute access in loop
threshold = self.threshold
samples_since_last_tag = self.samples_since_last_tag
last_state_above_threshold = self.last_state_above_threshold
nitems_written_0 = self.nitems_written(0)
if not self.gpio_initialized:
# Pre-created GPIO setup commands are added only once
abs_offset_ddr = max(nitems_written_0 - 1, 0)
self.add_item_tag(0, abs_offset_ddr, self.gpio_key, self.gpio_ddr_command, self.srcid)
abs_offset_ctrl = max(nitems_written_0, 0)
self.add_item_tag(0, abs_offset_ctrl, self.gpio_key, self.gpio_ctrl_command, self.srcid)
self.gpio_initialized = True
# Avoid repeated calls to time.time() by updating only when needed
current_time = None
for i, sample in enumerate(in0):
current_state_above_threshold = sample > threshold
samples_since_last_tag += 1
# Only proceed if dead time has elapsed
if samples_since_last_tag >= self.dead_time_samples:
# Check for threshold crossing (low-to-high)
if current_state_above_threshold and not last_state_above_threshold:
abs_offset = max(nitems_written_0 + i - 1, 0)
self.add_item_tag(0, abs_offset, pmt.intern('tx_sob'), pmt.PMT_T, self.srcid)
self.add_item_tag(0, abs_offset, pmt.intern('tx_pkt_len'), pmt.from_long(80), self.srcid)
if current_time is None:
current_time = time.time()
# Use cached current_time for consistency
epoch_time = int(current_time)
frac_time = (current_time % 1) + 0.010 # Adding 10 ms
if frac_time >= 1.0:
epoch_time += 1
frac_time -= 1.0
tx_time_val = pmt.make_tuple(pmt.from_uint64(epoch_time), pmt.from_double(frac_time))
self.add_item_tag(0, abs_offset, pmt.intern('tx_time'), tx_time_val, self.srcid)
samples_since_last_tag = 0
# Check for threshold crossing (high-to-low)
elif not current_state_above_threshold and last_state_above_threshold:
abs_offset = nitems_written_0 + i
self.add_item_tag(0, abs_offset, pmt.intern('tx_eob'), pmt.PMT_T, self.srcid)
samples_since_last_tag = 0
# Update the last state for the next iteration
last_state_above_threshold = current_state_above_threshold
# Update self attributes after loop completes
self.samples_since_last_tag = samples_since_last_tag
self.last_state_above_threshold = last_state_above_threshold
return len(out0)_______________________________________________
USRP-users mailing list -- [email protected]
To unsubscribe send an email to [email protected]