Dear,

I am revisiting an issue I encountered earlier. I have a flow that
generates pulses with spaces between them continuously. To switch between
the RX and TX paths on a single antenna, I tried utilizing GPIO to control
an RF switch.

Tests were conducted on a B210 and X310. Initially, I used GPIO tags on a
USRP sink, but this caused continuous underruns. A sample rate of 4 MHz was
used. I then switched to using bursts in combination with ATR, but the
issue persists. The USRP is unable to keep up, resulting in significant
underruns.

My goal is to send pulses at specific times and have the GPIO state follow
accordingly. When sending 0s or no samples, the GPIO state should be low;
otherwise, it should be high.

Could you advise on how to achieve this? I have attached my embedded Python
block code, which is positioned just before the USRP sink block. This block
adds the tags for the start and end of bursts and handles the initial GPIO
setup. For reference, I have also included the manual GPIO control block.

Thank you for your assistance.

Best regards,

Tim Vancauwenbergh
import numpy as np
from gnuradio import gr
import pmt

class gpio_toggler(gr.sync_block):
    def __init__(self, threshold=0.5):
        # Initialize the block
        super().__init__(
            name='GPIO Toggler',
            in_sig=[np.float32],
            out_sig=[np.float32]
        )

        self.threshold = threshold
        self.last_state_above_threshold = False  # Track the previous state

        # GPIO configuration
        self.gpio_mask = 0xFFF  # Mask for 12 GPIO pins
        self.gpio_high = 0xFFF  # Set all GPIO pins to HIGH
        self.gpio_low = 0x000    # Set all GPIO pins to LOW

        self.key = pmt.intern('tx_command')  # Key for the tx_command tag
        self.srcid = pmt.intern('gpio_toggler')  # Source ID for tagging

        # Flag to check if GPIO has been initialized
        self.gpio_initialized = False

        # Create initial commands for setting up GPIO
        self.gpio_ddr_command = self.create_gpio_command("FP0A", "DDR", self.gpio_mask)
        self.gpio_ctrl_command = self.create_gpio_command("FP0A", "CTRL", self.gpio_low)

    def create_gpio_command(self, bank, attr, value):
        """Create a PMT dictionary for the GPIO command."""
        gpio_command = pmt.make_dict()
        gpio_command = pmt.dict_add(gpio_command, pmt.intern("bank"), pmt.intern(bank))
        gpio_command = pmt.dict_add(gpio_command, pmt.intern("attr"), pmt.intern(attr))
        gpio_command = pmt.dict_add(gpio_command, pmt.intern("value"), pmt.from_double(value))
        gpio_command = pmt.dict_add(gpio_command, pmt.intern("mask"), pmt.from_double(self.gpio_mask))
        return gpio_command

    def work(self, input_items, output_items):
        in0 = input_items[0]
        out0 = output_items[0]

        # Pass-through block
        out0[:] = in0

        for i in range(len(in0)):
            sample = in0[i]
            current_state_above_threshold = sample > self.threshold

            # Send GPIO setup commands on the first iteration
            if not self.gpio_initialized:
                # Tag GPIO setup command for DDR
                tx_command_ddr = pmt.make_dict()
                tx_command_ddr = pmt.dict_add(tx_command_ddr, pmt.intern("gpio"), self.gpio_ddr_command)

                abs_offset_ddr = max(self.nitems_written(0) + i - 1, 0)
                self.add_item_tag(0, abs_offset_ddr, self.key, tx_command_ddr, self.srcid)

                # Tag GPIO setup command for CTRL
                tx_command_ctrl = pmt.make_dict()
                tx_command_ctrl = pmt.dict_add(tx_command_ctrl, pmt.intern("gpio"), self.gpio_ctrl_command)

                abs_offset_ctrl = max(self.nitems_written(0) + i, 0)
                self.add_item_tag(0, abs_offset_ctrl, self.key, tx_command_ctrl, self.srcid)

                # Set GPIO initialized flag to True to prevent re-sending
                self.gpio_initialized = True

            # Check for threshold crossing from low to high (ON command)
            if current_state_above_threshold and not self.last_state_above_threshold:
                # Create GPIO ON command
                gpio_on_command = self.create_gpio_command("FP0A", "OUT", self.gpio_high)

                # Create tx_command dictionary with gpio as its value
                tx_command = pmt.make_dict()
                tx_command = pmt.dict_add(tx_command, pmt.intern("gpio"), gpio_on_command)

                # Tag the ON command one sample before the current index
                abs_offset = max(self.nitems_written(0) + i - 1, 0)
                self.add_item_tag(0, abs_offset, self.key, tx_command, self.srcid)

            # Check for threshold crossing from high to low (OFF command)
            elif not current_state_above_threshold and self.last_state_above_threshold:
                # Create GPIO OFF command
                gpio_off_command = self.create_gpio_command("FP0A", "OUT", self.gpio_low)

                # Create tx_command dictionary with gpio as its value
                tx_command = pmt.make_dict()
                tx_command = pmt.dict_add(tx_command, pmt.intern("gpio"), gpio_off_command)

                # Tag the OFF command at the current index
                abs_offset = self.nitems_written(0) + i
                self.add_item_tag(0, abs_offset, self.key, tx_command, self.srcid)

            # Update last state for next iteration
            self.last_state_above_threshold = current_state_above_threshold

        return len(output_items[0])
import numpy as np
from gnuradio import gr
import pmt
import time

class gpio_toggler(gr.sync_block):
    def __init__(self, threshold=0.5, channel_type=0, sample_rate=4e6):
        super().__init__(
            name='GPIO Tagger',
            in_sig=[np.float32],
            out_sig=[np.float32]
        )

        self.threshold = threshold
        self.last_state_above_threshold = False
        if channel_type == 0:
            self.dead_time_us = (8/2) + 12
        else:
            self.dead_time_us = (8/2) + 36
            
        self.dead_time_samples = int((self.dead_time_us / 1e6) * sample_rate)
        self.samples_since_last_tag = self.dead_time_samples

        self.gpio_key = pmt.intern('tx_command')
        self.srcid = pmt.intern('gpio_toggler')

        # Precompute GPIO setup commands to avoid repeated PMT dict creation
        self.gpio_initialized = False
        self.gpio_ddr_command = self.create_gpio_command("FP0", "DDR", 0x001, 0xFFF)
        self.gpio_ctrl_command = self.create_gpio_command("FP0", "CTRL", 0x001, 0xFFF)
        self.gpio_atr_command = self.create_gpio_command("FP0", "ATR_0X", 0x000, 0xFFF)

    def create_gpio_command(self, bank, attr, value, mask):
        gpio_command = pmt.make_dict()
        gpio_command = pmt.dict_add(gpio_command, pmt.intern("bank"), pmt.intern(bank))
        gpio_command = pmt.dict_add(gpio_command, pmt.intern("attr"), pmt.intern(attr))
        gpio_command = pmt.dict_add(gpio_command, pmt.intern("value"), pmt.from_double(value))
        gpio_command = pmt.dict_add(gpio_command, pmt.intern("mask"), pmt.from_double(mask))
        return gpio_command

    def work(self, input_items, output_items):
        in0 = input_items[0]
        out0 = output_items[0]
        out0[:] = in0

        # Local copies to reduce attribute access in loop
        threshold = self.threshold
        samples_since_last_tag = self.samples_since_last_tag
        last_state_above_threshold = self.last_state_above_threshold
        nitems_written_0 = self.nitems_written(0)

        if not self.gpio_initialized:
            # Pre-created GPIO setup commands are added only once
            abs_offset_ddr = max(nitems_written_0 - 1, 0)
            self.add_item_tag(0, abs_offset_ddr, self.gpio_key, self.gpio_ddr_command, self.srcid)
            
            abs_offset_ctrl = max(nitems_written_0, 0)
            self.add_item_tag(0, abs_offset_ctrl, self.gpio_key, self.gpio_ctrl_command, self.srcid)

            self.gpio_initialized = True

        # Avoid repeated calls to time.time() by updating only when needed
        current_time = None

        for i, sample in enumerate(in0):
            current_state_above_threshold = sample > threshold
            samples_since_last_tag += 1

            # Only proceed if dead time has elapsed
            if samples_since_last_tag >= self.dead_time_samples:
                # Check for threshold crossing (low-to-high)
                if current_state_above_threshold and not last_state_above_threshold:
                    abs_offset = max(nitems_written_0 + i - 1, 0)
                    self.add_item_tag(0, abs_offset, pmt.intern('tx_sob'), pmt.PMT_T, self.srcid)
                    self.add_item_tag(0, abs_offset, pmt.intern('tx_pkt_len'), pmt.from_long(80), self.srcid)

                    if current_time is None:
                        current_time = time.time()
                    
                    # Use cached current_time for consistency
                    epoch_time = int(current_time)
                    frac_time = (current_time % 1) + 0.010  # Adding 10 ms

                    if frac_time >= 1.0:
                        epoch_time += 1
                        frac_time -= 1.0

                    tx_time_val = pmt.make_tuple(pmt.from_uint64(epoch_time), pmt.from_double(frac_time))
                    self.add_item_tag(0, abs_offset, pmt.intern('tx_time'), tx_time_val, self.srcid)

                    samples_since_last_tag = 0

                # Check for threshold crossing (high-to-low)
                elif not current_state_above_threshold and last_state_above_threshold:
                    abs_offset = nitems_written_0 + i
                    self.add_item_tag(0, abs_offset, pmt.intern('tx_eob'), pmt.PMT_T, self.srcid)
                    samples_since_last_tag = 0

            # Update the last state for the next iteration
            last_state_above_threshold = current_state_above_threshold

        # Update self attributes after loop completes
        self.samples_since_last_tag = samples_since_last_tag
        self.last_state_above_threshold = last_state_above_threshold

        return len(out0)
_______________________________________________
USRP-users mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to