Hoernchen has uploaded this change for review. ( 
https://gerrit.osmocom.org/c/osmo-trx/+/30414 )


Change subject: ill-fated ipcv2 for mstrx
......................................................................

ill-fated ipcv2 for mstrx

The problem here is that the ipc if requires reducing the clock ind
inteval to a few fn/ts on the trx side to work at all, but scheduling
still does not work out, unless you start driving the IF using the tx
side, at which point the approach is useless, because it does not really
do more than a burst loopback that is easier to do with the higher
layers/interfaces.

The code is still useful should there be a reason to continue working on
this.

Change-Id: I8f582c7c06fed8d1dcc5ea52472a97dc313fdde5
---
M Transceiver52M/device/Makefile.am
A Transceiver52M/device/ipc2/IPCDevice.cpp
A Transceiver52M/device/ipc2/IPCDevice.h
A Transceiver52M/device/ipc2/Makefile.am
A Transceiver52M/device/ipc2/ipcif.h
A Transceiver52M/device/ipc2/shmif.h
6 files changed, 1,281 insertions(+), 1 deletion(-)



  git pull ssh://gerrit.osmocom.org:29418/osmo-trx refs/changes/14/30414/1

diff --git a/Transceiver52M/device/Makefile.am 
b/Transceiver52M/device/Makefile.am
index 9af18f7..1fe7a8e 100644
--- a/Transceiver52M/device/Makefile.am
+++ b/Transceiver52M/device/Makefile.am
@@ -3,7 +3,7 @@
 SUBDIRS = common

 if DEVICE_IPC
-SUBDIRS += ipc
+SUBDIRS += ipc ipc2
 endif

 if DEVICE_USRP1
diff --git a/Transceiver52M/device/ipc2/IPCDevice.cpp 
b/Transceiver52M/device/ipc2/IPCDevice.cpp
new file mode 100644
index 0000000..0fc25c8
--- /dev/null
+++ b/Transceiver52M/device/ipc2/IPCDevice.cpp
@@ -0,0 +1,318 @@
+/*
+* Copyright 2022 sysmocom - s.f.m.c. GmbH <i...@sysmocom.de>
+* Author: Eric Wild <ew...@sysmocom.de>
+*
+* SPDX-License-Identifier: AGPL-3.0+
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program.  If not, see <http://www.gnu.org/licenses/>.
+* See the COPYING file in the main directory for details.
+*/
+
+#include <sys/time.h>
+#include <osmocom/core/timer_compat.h>
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "Logger.h"
+#include "Threads.h"
+#include "IPCDevice.h"
+#include "smpl_buf.h"
+
+#define SAMPLE_BUF_SZ (1 << 20)
+static const auto ONE_BIT_DURATION ((12./5200.)/(156.25*4.));
+static const auto ONE_SAMPLE_DURATION_US ((ONE_BIT_DURATION/4.)*1000*1000);
+using namespace std;
+
+IPCDevice2::IPCDevice2(size_t tx_sps, size_t rx_sps, InterfaceType iface, 
size_t chan_num, double lo_offset,
+                      const std::vector<std::string> &tx_paths, const 
std::vector<std::string> &rx_paths)
+       : RadioDevice(tx_sps, rx_sps, iface, chan_num, lo_offset, tx_paths, 
rx_paths), rx_buffers(chans),
+         started(false), tx_gains(chans), rx_gains(chans)
+{
+       LOGC(DDEV, INFO) << "creating IPC device...";
+
+       if (!(tx_sps == 4) || !(rx_sps == 4)) {
+               LOGC(DDEV, FATAL) << "IPC shm if create failed!";
+               exit(0);
+       }
+
+       /* Set up per-channel Rx timestamp based Ring buffers */
+       for (size_t i = 0; i < rx_buffers.size(); i++)
+               rx_buffers[i] = new smpl_buf(SAMPLE_BUF_SZ / sizeof(uint32_t));
+
+       if (!m.create()) {
+               LOGC(DDEV, FATAL) << "IPC shm if create failed!";
+               exit(0);
+       }
+}
+
+IPCDevice2::~IPCDevice2()
+{
+       LOGC(DDEV, INFO) << "Closing IPC device";
+       /* disable all channels */
+
+       for (size_t i = 0; i < rx_buffers.size(); i++)
+               delete rx_buffers[i];
+}
+
+int IPCDevice2::open(const std::string &args, int ref, bool swap_channels)
+{
+       std::string k, v;
+
+       /* configure antennas */
+       if (!set_antennas()) {
+               LOGC(DDEV, FATAL) << "IPC antenna setting failed";
+               goto out_close;
+       }
+
+       return iface == MULTI_ARFCN ? MULTI_ARFCN : NORMAL;
+
+out_close:
+       LOGC(DDEV, FATAL) << "Error in IPC open, closing";
+       return -1;
+}
+
+bool IPCDevice2::start()
+{
+       LOGC(DDEV, INFO) << "starting IPC...";
+
+       if (started) {
+               LOGC(DDEV, ERR) << "Device already started";
+               return true;
+       }
+
+       int max_bufs_to_flush = 120;
+       flush_recv(max_bufs_to_flush);
+
+       started = true;
+       return true;
+}
+
+bool IPCDevice2::stop()
+{
+       if (!started)
+               return true;
+
+       LOGC(DDEV, NOTICE) << "All channels stopped, terminating...";
+
+       /* reset internal buffer timestamps */
+       for (size_t i = 0; i < rx_buffers.size(); i++)
+               rx_buffers[i]->reset();
+
+       started = false;
+       return true;
+}
+
+double IPCDevice2::maxRxGain()
+{
+       return 70;
+}
+
+double IPCDevice2::minRxGain()
+{
+       return 0;
+}
+
+int IPCDevice2::getNominalTxPower(size_t chan)
+{
+       return 10;
+}
+
+double IPCDevice2::setPowerAttenuation(int atten, size_t chan)
+{
+       return atten;
+}
+
+double IPCDevice2::getPowerAttenuation(size_t chan)
+{
+       return 0;
+}
+
+double IPCDevice2::setRxGain(double dB, size_t chan)
+{
+       if (dB > maxRxGain())
+               dB = maxRxGain();
+       if (dB < minRxGain())
+               dB = minRxGain();
+
+       LOGCHAN(chan, DDEV, NOTICE) << "Setting RX gain to " << dB << " dB";
+
+       return dB;
+}
+
+bool IPCDevice2::flush_recv(size_t num_pkts)
+{
+       ts_initial = 10000;
+
+       LOGC(DDEV, INFO) << "Initial timestamp " << ts_initial << std::endl;
+       return true;
+}
+
+bool IPCDevice2::setRxAntenna(const std::string &ant, size_t chan)
+{
+       return true;
+}
+
+std::string IPCDevice2::getRxAntenna(size_t chan)
+{
+       return "";
+}
+
+bool IPCDevice2::setTxAntenna(const std::string &ant, size_t chan)
+{
+       return true;
+}
+
+std::string IPCDevice2::getTxAntenna(size_t chan)
+{
+       return "";
+}
+
+bool IPCDevice2::requiresRadioAlign()
+{
+       return false;
+}
+
+GSM::Time IPCDevice2::minLatency()
+{
+       /* UNUSED */
+       return GSM::Time(0, 0);
+}
+
+/** Returns the starting write Timestamp*/
+TIMESTAMP IPCDevice2::initialWriteTimestamp(void)
+{
+       return ts_initial;
+}
+
+/** Returns the starting read Timestamp*/
+TIMESTAMP IPCDevice2::initialReadTimestamp(void)
+{
+       return ts_initial;
+}
+
+static timespec readtime, writetime;
+static void wait_for_sample_time(timespec* last, unsigned int len) {
+       #if 1
+       timespec ts, diff;
+       clock_gettime(CLOCK_MONOTONIC, &ts);
+       timespecsub(&ts, last, &diff);
+       auto elapsed_us = (diff.tv_sec * 1000000) + (diff.tv_nsec / 1000);
+       auto max_wait_us = ONE_SAMPLE_DURATION_US * len;
+       if(elapsed_us < max_wait_us)
+               usleep(max_wait_us-elapsed_us);
+       *last = ts;
+       #else
+       usleep(ONE_SAMPLE_DURATION_US * 625);
+       #endif
+}
+
+// NOTE: Assumes sequential reads
+int IPCDevice2::readSamples(std::vector<short *> &bufs, int len, bool 
*overrun, TIMESTAMP timestamp, bool *underrun)
+{
+       int rc, num_smpls; //, expect_smpls;
+       ssize_t avail_smpls;
+       unsigned int i = 0;
+
+       *overrun = false;
+       *underrun = false;
+
+       timestamp += 0;
+
+       /* Check that timestamp is valid */
+       rc = rx_buffers[0]->avail_smpls(timestamp);
+       if (rc < 0) {
+               LOGC(DDEV, ERROR) << rx_buffers[0]->str_code(rc);
+               LOGC(DDEV, ERROR) << rx_buffers[0]->str_status(timestamp);
+               return 0;
+       }
+
+       /* Receive samples from HW until we have enough */
+       while ((avail_smpls = rx_buffers[i]->avail_smpls(timestamp)) < len) {
+               uint64_t recv_timestamp = timestamp;
+
+               m.read_ul(len - avail_smpls, &recv_timestamp, 
reinterpret_cast<sample_t *>(bufs[0]));
+               num_smpls = len - avail_smpls;
+               wait_for_sample_time(&readtime, num_smpls);
+
+               if (num_smpls == -ETIMEDOUT)
+                       continue;
+
+               LOGCHAN(i, DDEV, DEBUG)
+               "Received timestamp = " << (TIMESTAMP)recv_timestamp << " (" << 
num_smpls << ")";
+
+               rc = rx_buffers[i]->write(bufs[i], num_smpls, 
(TIMESTAMP)recv_timestamp);
+               if (rc < 0) {
+                       LOGCHAN(i, DDEV, ERROR)
+                               << rx_buffers[i]->str_code(rc) << " num smpls: 
" << num_smpls << " chan: " << i;
+                       LOGCHAN(i, DDEV, ERROR) << 
rx_buffers[i]->str_status(timestamp);
+                       if (rc != smpl_buf::ERROR_OVERFLOW)
+                               return 0;
+               }
+       }
+
+       /* We have enough samples */
+
+       rc = rx_buffers[i]->read(bufs[i], len, timestamp);
+       if ((rc < 0) || (rc != len)) {
+               LOGCHAN(i, DDEV, ERROR) << rx_buffers[i]->str_code(rc) << ". " 
<< rx_buffers[i]->str_status(timestamp)
+                                       << ", (len=" << len << ")";
+               return 0;
+       }
+
+       return len;
+}
+
+int IPCDevice2::writeSamples(std::vector<short *> &bufs, int len, bool 
*underrun, unsigned long long timestamp)
+{
+       *underrun = false;
+
+       LOGCHAN(0, DDEV, DEBUG) << "send buffer of len " << len << " timestamp 
" << std::hex << timestamp;
+
+       // rc = ipc_shm_enqueue(shm_io_tx_streams[i], timestamp, len, (uint16_t 
*)bufs[i]);
+       m.write_dl(len, timestamp, reinterpret_cast<sample_t *>(bufs[0]));
+       wait_for_sample_time(&writetime, len);
+
+       return len;
+}
+
+bool IPCDevice2::updateAlignment(TIMESTAMP timestamp)
+{
+       return true;
+}
+
+bool IPCDevice2::setTxFreq(double wFreq, size_t chan)
+{
+       return true;
+}
+
+bool IPCDevice2::setRxFreq(double wFreq, size_t chan)
+{
+       return true;
+}
+
+RadioDevice *RadioDevice::make(size_t tx_sps, size_t rx_sps, InterfaceType 
iface, size_t chans, double lo_offset,
+                              const std::vector<std::string> &tx_paths, const 
std::vector<std::string> &rx_paths)
+{
+       if (tx_sps != rx_sps) {
+               LOGC(DDEV, ERROR) << "IPC Requires tx_sps == rx_sps";
+               return NULL;
+       }
+       if (lo_offset != 0.0) {
+               LOGC(DDEV, ERROR) << "IPC doesn't support lo_offset";
+               return NULL;
+       }
+       return new IPCDevice2(tx_sps, rx_sps, iface, chans, lo_offset, 
tx_paths, rx_paths);
+}
diff --git a/Transceiver52M/device/ipc2/IPCDevice.h 
b/Transceiver52M/device/ipc2/IPCDevice.h
new file mode 100644
index 0000000..6286979
--- /dev/null
+++ b/Transceiver52M/device/ipc2/IPCDevice.h
@@ -0,0 +1,186 @@
+/*
+* Copyright 2022 sysmocom - s.f.m.c. GmbH <i...@sysmocom.de>
+* Author: Eric Wild <ew...@sysmocom.de>
+*
+* SPDX-License-Identifier: AGPL-3.0+
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program.  If not, see <http://www.gnu.org/licenses/>.
+* See the COPYING file in the main directory for details.
+*/
+
+#ifndef _IPC_DEVICE_H_
+#define _IPC_DEVICE_H_
+
+
+#include <climits>
+#include <string>
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "radioDevice.h"
+#include "ipcif.h"
+
+class smpl_buf;
+
+class IPCDevice2 : public RadioDevice {
+       trxmsif m;
+    protected:
+       std::vector<smpl_buf *> rx_buffers;
+       double actualSampleRate;
+
+       bool started;
+
+       TIMESTAMP ts_initial;
+
+       std::vector<double> tx_gains, rx_gains;
+
+       bool flush_recv(size_t num_pkts);
+       void update_stream_stats_rx(size_t chan, bool *overrun);
+       void update_stream_stats_tx(size_t chan, bool *underrun);
+
+       bool send_chan_wait_rsp(uint32_t chan, struct msgb *msg_to_send, 
uint32_t expected_rsp_msg_id);
+       bool send_all_chan_wait_rsp(uint32_t msgid_to_send, uint32_t 
msgid_to_expect);
+
+    public:
+       /** Object constructor */
+       IPCDevice2(size_t tx_sps, size_t rx_sps, InterfaceType iface, size_t 
chan_num, double lo_offset,
+                  const std::vector<std::string> &tx_paths, const 
std::vector<std::string> &rx_paths);
+       virtual ~IPCDevice2() override;
+
+       /** Instantiate the IPC */
+       virtual int open(const std::string &args, int ref, bool swap_channels) 
override;
+
+       /** Start the IPC */
+       virtual bool start() override;
+
+       /** Stop the IPC */
+       virtual bool stop() override;
+
+       /* FIXME: any != USRP1 will do for now... */
+       enum TxWindowType getWindowType() override
+       {
+               return TX_WINDOW_FIXED;
+       }
+
+       /**
+       Read samples from the IPC.
+       @param buf preallocated buf to contain read result
+       @param len number of samples desired
+       @param overrun Set if read buffer has been overrun, e.g. data not being 
read fast enough
+       @param timestamp The timestamp of the first samples to be read
+       @param underrun Set if IPC does not have data to transmit, e.g. data 
not being sent fast enough
+       @return The number of samples actually read
+       */
+       virtual int readSamples(std::vector<short *> &buf, int len, bool 
*overrun, TIMESTAMP timestamp = 0xffffffff,
+                               bool *underrun = NULL) override;
+       /**
+       Write samples to the IPC.
+       @param buf Contains the data to be written.
+       @param len number of samples to write.
+       @param underrun Set if IPC does not have data to transmit, e.g. data 
not being sent fast enough
+       @param timestamp The timestamp of the first sample of the data buffer.
+       @return The number of samples actually written
+       */
+       virtual int writeSamples(std::vector<short *> &bufs, int len, bool 
*underrun,
+                                TIMESTAMP timestamp = 0xffffffff) override;
+
+       /** Update the alignment between the read and write timestamps */
+       virtual bool updateAlignment(TIMESTAMP timestamp) override;
+
+       /** Set the transmitter frequency */
+       virtual bool setTxFreq(double wFreq, size_t chan = 0) override;
+
+       /** Set the receiver frequency */
+       virtual bool setRxFreq(double wFreq, size_t chan = 0) override;
+
+       /** Returns the starting write Timestamp*/
+       virtual TIMESTAMP initialWriteTimestamp(void) override;
+
+       /** Returns the starting read Timestamp*/
+       virtual TIMESTAMP initialReadTimestamp(void) override;
+
+       /** returns the full-scale transmit amplitude **/
+       virtual double fullScaleInputValue() override
+       {
+               return (double)SHRT_MAX * 1;
+       }
+
+       /** returns the full-scale receive amplitude **/
+       virtual double fullScaleOutputValue() override
+       {
+               return (double)SHRT_MAX * 1;
+       }
+
+       /** sets the receive chan gain, returns the gain setting **/
+       virtual double setRxGain(double dB, size_t chan = 0) override;
+
+       /** get the current receive gain */
+       virtual double getRxGain(size_t chan = 0) override
+       {
+               return rx_gains[chan];
+       }
+
+       /** return maximum Rx Gain **/
+       virtual double maxRxGain(void) override;
+
+       /** return minimum Rx Gain **/
+       virtual double minRxGain(void) override;
+
+       /* FIXME: return rx_gains[chan] ? receive factor from IPC Driver? */
+       double rssiOffset(size_t chan) override
+       {
+               return 0.0f;
+       };
+
+       double setPowerAttenuation(int atten, size_t chan) override;
+       double getPowerAttenuation(size_t chan = 0) override;
+
+       virtual int getNominalTxPower(size_t chan = 0) override;
+
+       /** sets the RX path to use, returns true if successful and false 
otherwise */
+       virtual bool setRxAntenna(const std::string &ant, size_t chan = 0) 
override;
+
+       /* return the used RX path */
+       virtual std::string getRxAntenna(size_t chan = 0) override;
+
+       /** sets the RX path to use, returns true if successful and false 
otherwise */
+       virtual bool setTxAntenna(const std::string &ant, size_t chan = 0) 
override;
+
+       /* return the used RX path */
+       virtual std::string getTxAntenna(size_t chan = 0) override;
+
+       /** return whether user drives synchronization of Tx/Rx of USRP */
+       virtual bool requiresRadioAlign() override;
+
+       /** return whether user drives synchronization of Tx/Rx of USRP */
+       virtual GSM::Time minLatency() override;
+
+       /** Return internal status values */
+       virtual inline double getTxFreq(size_t chan = 0) override
+       {
+               return 0;
+       }
+       virtual inline double getRxFreq(size_t chan = 0) override
+       {
+               return 0;
+       }
+       virtual inline double getSampleRate() override
+       {
+               return actualSampleRate;
+       }
+};
+
+#endif // _IPC_DEVICE_H_
diff --git a/Transceiver52M/device/ipc2/Makefile.am 
b/Transceiver52M/device/ipc2/Makefile.am
new file mode 100644
index 0000000..ec5d533
--- /dev/null
+++ b/Transceiver52M/device/ipc2/Makefile.am
@@ -0,0 +1,14 @@
+include $(top_srcdir)/Makefile.common
+
+AM_CPPFLAGS = -Wall $(STD_DEFINES_AND_INCLUDES) -I${srcdir}/../common
+AM_CFLAGS = -lpthread $(LIBOSMOCORE_CFLAGS)
+AM_CXXFLAGS = -lpthread $(LIBOSMOCORE_CFLAGS)
+AM_LDFLAGS = -lpthread -lrt
+
+noinst_HEADERS = IPCDevice.h
+
+noinst_LTLIBRARIES = libdevice.la
+
+libdevice_la_SOURCES = IPCDevice.cpp
+libdevice_la_LIBADD = 
$(top_builddir)/Transceiver52M/device/common/libdevice_common.la
+libdevice_la_CXXFLAGS = $(AM_CXXFLAGS) -DIPCMAGIC
diff --git a/Transceiver52M/device/ipc2/ipcif.h 
b/Transceiver52M/device/ipc2/ipcif.h
new file mode 100644
index 0000000..72f4c28
--- /dev/null
+++ b/Transceiver52M/device/ipc2/ipcif.h
@@ -0,0 +1,387 @@
+/*
+ * (C) 2022 by sysmocom s.f.m.c. GmbH <i...@sysmocom.de>
+ * All Rights Reserved
+ *
+ * Author: Eric Wild <ew...@sysmocom.de>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#pragma once
+
+#include <atomic>
+#include <complex>
+#include <cassert>
+#include <deque>
+#include <mutex>
+#include <vector>
+
+#include "shmif.h"
+
+const int max_ul_rdlen = 1024 * 10;
+const int max_dl_rdlen = 1024 * 10;
+using sample_t = std::complex<int16_t>;
+struct shm_if {
+       std::atomic<bool> ms_connected;
+       struct {
+               shm::sema r;
+               shm::sema w;
+               std::atomic<uint64_t> ts;
+               std::atomic<uint64_t> ts_req;
+               std::atomic<size_t> len_written_sps; // ->
+               sample_t buffer[max_ul_rdlen];
+       } ul;
+       struct {
+               shm::sema r;
+               shm::sema w;
+               std::atomic<uint64_t> ts;
+               std::atomic<uint64_t> ts_req;
+               std::atomic<size_t> len_written_sps;
+               sample_t buffer[max_dl_rdlen];
+       } dl;
+};
+
+// unique up to signed_type/2 diff
+// ex: uint8/int8 (250, 0) = -6
+template <typename A> auto unsigned_diff(A a, A b) -> typename 
std::make_signed<A>::type
+{
+       using stype = typename std::make_signed<A>::type;
+       return (a > b) ? static_cast<stype>(a - b) : -static_cast<stype>(b - a);
+};
+
+constexpr inline int samp2byte(int v)
+{
+       return v * sizeof(sample_t);
+}
+constexpr inline int byte2samp(int v)
+{
+       return v / sizeof(sample_t);
+}
+
+struct ulentry {
+       bool done;
+       uint64_t ts;
+       unsigned int len_in_sps;
+       unsigned int read_pos_in_sps;
+       sample_t buf[1000];
+};
+/*
+               write: find read index +.. until marked free = "end" of current 
list
+
+               check:
+               within begin, end AND not free?
+                       y:
+                       copy (chunk)
+                               if chunk advance burst buf ptr
+                       n: next, advance, remove old.
+               */
+template <unsigned int num_bursts> class ulburstprovider {
+       std::mutex ul_q_m;
+       // std::deque<ulentry> ul_q;
+
+       // classic circular buffer
+       ulentry foo[num_bursts];
+       int current_index; // % num_bursts
+
+       void cur_buf_done()
+       {
+               foo[current_index].done = true;
+               current_index = current_index + 1 % num_bursts;
+       }
+       bool is_empty()
+       {
+               return foo[current_index].done = true;
+       }
+       void reset()
+       {
+               for (auto &i : foo)
+                       i = {};
+               current_index = 0;
+       }
+       ulentry &find_free_at_end()
+       {
+               for (int i = current_index, max_to_search = 0; max_to_search < 
num_bursts;
+                    i = (i + 1 % num_bursts), max_to_search++) {
+                       if (foo[i].done)
+                               return foo[i];
+               }
+               return foo[0]; // FIXME actually broken, q full, wat do?
+       }
+
+       void push_back(ulentry &e)
+       {
+               auto free_buf = find_free_at_end();
+               free_buf = e;
+               e.done = false;
+       }
+
+    public:
+       void add(ulentry &e)
+       {
+               std::lock_guard<std::mutex> foo(ul_q_m);
+               push_back(e);
+       }
+       void get(uint64_t requested_ts, unsigned int req_len_in_sps, sample_t 
*buf, unsigned int max_buf_write_len)
+       {
+               std::lock_guard<std::mutex> g(ul_q_m);
+
+               /*
+               1) if empty return
+               2) if not empty prune stale bursts
+               3) if only future bursts also return and zero buf
+               */
+               for (int i = current_index, max_to_search = 0; max_to_search < 
num_bursts;
+                    i = (i + 1 % num_bursts), max_to_search++) {
+                       auto cur_entry = foo[i];
+                       if (is_empty()) { // might be empty due to advance 
below!
+                               memset(buf, 0, samp2byte(req_len_in_sps));
+                               return;
+                       }
+
+                       if (cur_entry.ts + cur_entry.len_in_sps < requested_ts) 
{ // remove late bursts
+                               if (i == current_index) // only advance if we 
are at the front
+                                       cur_buf_done();
+                               else
+                                       assert(true);
+                       } else if (cur_entry.ts >= requested_ts + 
byte2samp(max_buf_write_len)) { // not in range
+                               memset(buf, 0, samp2byte(req_len_in_sps));
+                               return;
+
+                               // FIXME: what about requested_ts <= entry.ts 
<= ts + reqlen?
+                       } else {
+                               // requested_ts <= cur_entry.ts <= requested_ts 
+ byte2samp(max_write_len)
+
+                               auto before_sps = unsigned_diff(cur_entry.ts, 
requested_ts);
+
+                               // at least one whole buffer before our most 
recent "head" burst?
+                               // set 0, return.
+                               if (-before_sps >= 
byte2samp(max_buf_write_len)) {
+                                       memset(buf, 0, 
samp2byte(req_len_in_sps));
+                                       return;
+                               }
+                               // less than one full buffer before: pad 0
+                               auto to_pad_sps = -before_sps;
+                               memset(buf, 0, samp2byte(to_pad_sps));
+                               requested_ts += to_pad_sps;
+                               req_len_in_sps -= to_pad_sps;
+
+                               if (!req_len_in_sps)
+                                       return;
+
+                               // actual burst data after possible 0 pad
+                               auto max_sps_to_write = 
std::min(cur_entry.len_in_sps, req_len_in_sps);
+                               memcpy(&buf[samp2byte(to_pad_sps)], 
cur_entry.buf, samp2byte(max_sps_to_write));
+                               requested_ts += max_sps_to_write;
+                               req_len_in_sps -= max_sps_to_write;
+                               cur_entry.read_pos_in_sps += max_sps_to_write;
+
+                               //this buf is done...
+                               if (cur_entry.read_pos_in_sps == 
cur_entry.len_in_sps) {
+                                       cur_buf_done();
+                               }
+
+                               if (!req_len_in_sps)
+                                       return;
+                       }
+               }
+       }
+};
+
+class trxmsif {
+       shm::shm<shm_if> m;
+       shm_if *ptr;
+
+       ulburstprovider<10> p;
+
+       template <typename T> void read(T &direction, size_t howmany_sps, 
uint64_t *read_ts, sample_t *outbuf)
+       {
+               static int readoffset_sps;
+               // auto &direction = ptr->dl;
+               auto buf = &direction.buffer[0];
+               size_t len_avail_sps = direction.len_written_sps.load();
+
+               auto left_to_read = len_avail_sps - readoffset_sps;
+
+               shm::mtx_log::print_guard() << "\tr @" << direction.ts.load() 
<< " " << readoffset_sps << std::endl;
+
+               // no data, wait for new buffer, maybe some data left afterwards
+               if (!left_to_read) {
+                       assert(readoffset_sps == len_avail_sps);
+                       readoffset_sps = 0;
+                       direction.r.reset_unsafe();
+                       direction.ts_req = (*read_ts);
+                       direction.w.set(1);
+                       direction.r.wait_and_reset(1);
+                       assert(*read_ts != direction.ts.load());
+                       // shm::sema_guard g(dl.r, dl.w);
+                       *read_ts = direction.ts.load();
+                       len_avail_sps = direction.len_written_sps.load();
+                       readoffset_sps += howmany_sps;
+                       assert(len_avail_sps >= howmany_sps);
+                       memcpy(outbuf, buf, samp2byte(howmany_sps));
+
+                       shm::mtx_log::print_guard() << "\tr+ " << *read_ts << " 
" << howmany_sps << std::endl;
+                       return;
+               }
+
+               *read_ts = direction.ts.load() + readoffset_sps;
+               left_to_read = len_avail_sps - readoffset_sps;
+
+               // data left from prev read
+               if (left_to_read >= howmany_sps) {
+                       memcpy(outbuf, &buf[readoffset_sps], 
samp2byte(howmany_sps));
+                       readoffset_sps += howmany_sps;
+
+                       shm::mtx_log::print_guard() << "\tr++ " << *read_ts << 
" " << howmany_sps << std::endl;
+                       return;
+               } else {
+                       memcpy(outbuf, &buf[readoffset_sps], 
samp2byte(left_to_read));
+                       readoffset_sps = 0;
+                       auto still_left_to_read = howmany_sps - left_to_read;
+                       {
+                               direction.r.reset_unsafe();
+                               direction.ts_req = (*read_ts);
+                               direction.w.set(1);
+                               direction.r.wait_and_reset(1);
+                               assert(*read_ts != direction.ts.load());
+                               len_avail_sps = 
direction.len_written_sps.load();
+                               assert(len_avail_sps >= still_left_to_read);
+                               memcpy(&outbuf[left_to_read], buf, 
samp2byte(still_left_to_read));
+                               readoffset_sps += still_left_to_read;
+                               shm::mtx_log::print_guard()
+                                       << "\tr+++2 " << *read_ts << " " << 
howmany_sps << " " << still_left_to_read
+                                       << " new @" << direction.ts.load() << 
std::endl;
+                       }
+               }
+       }
+
+    public:
+       trxmsif() : m("trx-ms-if")
+       {
+       }
+
+       bool create()
+       {
+               m.create();
+               ptr = m.p();
+               return m.isgood();
+       }
+       bool connect()
+       {
+               m.open();
+               ptr = m.p();
+               ptr->ms_connected = true;
+               ptr->dl.w.set(1);
+               return m.isgood();
+       }
+       bool good()
+       {
+               return m.isgood();
+       }
+       bool is_connected()
+       {
+               return ptr->ms_connected == true;
+       }
+
+       /* is being read from ms side */
+       void read_dl(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
+       {
+               return read(ptr->dl, howmany_sps, read_ts, outbuf);
+       }
+
+       /* is being read from trx/network side */
+       void read_ul(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
+       {
+               // if (ptr->ms_connected != true) {
+                       memset(outbuf, 0, samp2byte(howmany_sps));
+               //      return;
+               // }
+               // return read(ptr->ul, howmany_sps, read_ts, outbuf);
+       }
+
+       void write_dl(size_t howmany_sps, uint64_t write_ts, sample_t *inbuf)
+       {
+               auto &dl = ptr->dl;
+               auto buf = &dl.buffer[0];
+               if (ptr->ms_connected != true)
+                       return;
+
+               assert(sizeof(dl.buffer) >= samp2byte(howmany_sps));
+               // print_guard() << "####w " << std::endl;
+
+               {
+                       shm::sema_wait_guard g(dl.w, dl.r);
+
+                       memcpy(buf, inbuf, samp2byte(howmany_sps));
+                       dl.ts.store(write_ts);
+                       dl.len_written_sps.store(howmany_sps);
+               }
+               shm::mtx_log::print_guard() << std::endl
+                                           << "####w+ " << write_ts << " " << 
howmany_sps << std::endl
+                                           << std::endl;
+       }
+
+       void write_ul(size_t howmany_sps_sps, uint64_t write_ts, sample_t 
*inbuf)
+       {
+               auto &ul = ptr->ul;
+               assert(sizeof(ul.buffer) >= samp2byte(howmany_sps_sps));
+               // print_guard() << "####w " << std::endl;
+
+               ulentry e;
+               e.ts = write_ts;
+               e.len_in_sps = howmany_sps_sps;
+               e.done = false;
+               e.read_pos_in_sps = 0;
+               assert(sizeof(e.buf) >= samp2byte(howmany_sps_sps));
+               memcpy(e.buf, inbuf, samp2byte(howmany_sps_sps));
+               p.add(e);
+
+               shm::mtx_log::print_guard() << std::endl
+                                           << "####q+ " << write_ts << " " << 
howmany_sps_sps << std::endl
+                                           << std::endl;
+       }
+
+       void drive_tx()
+       {
+               auto &ul = ptr->ul;
+               auto buf = &ul.buffer[0];
+               const auto max_write_len = sizeof(ul.buffer);
+
+               // ul_q_m.lock();
+               // ul_q.push_front(e);
+               // ul_q_m.unlock();
+               // ul.w.wait_and_reset();
+
+               // no read waiting for a write
+               if (!ul.w.check_unsafe(1))
+                       return;
+
+               // FIXME: store written, notify after get!
+
+               auto requested_ts = ul.ts_req.load();
+
+               p.get(requested_ts, byte2samp(max_write_len), buf, 
max_write_len);
+
+               // memset(buf, 0, max_write_len);
+               ul.ts.store(requested_ts);
+               ul.len_written_sps.store(byte2samp(max_write_len));
+               ul.w.reset_unsafe();
+               ul.r.set(1);
+       }
+
+       void signal_read_start()
+       { /* nop */
+       }
+};
diff --git a/Transceiver52M/device/ipc2/shmif.h 
b/Transceiver52M/device/ipc2/shmif.h
new file mode 100644
index 0000000..89413ab
--- /dev/null
+++ b/Transceiver52M/device/ipc2/shmif.h
@@ -0,0 +1,375 @@
+/*
+ * (C) 2022 by sysmocom s.f.m.c. GmbH <i...@sysmocom.de>
+ * All Rights Reserved
+ *
+ * Author: Eric Wild <ew...@sysmocom.de>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#pragma once
+
+#include <atomic>
+#include <iostream>
+#include <cassert>
+#include <cstring>
+#include <mutex>
+#include <sstream>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <cerrno>
+
+namespace shm
+{
+
+namespace mtx_log
+{
+#if defined(MTX_LOG_ENABLED)
+       class print_guard : public std::ostringstream {
+               static std::mutex thread_print_lock;
+
+           public:
+               ~print_guard()
+               {
+                       std::lock_guard<std::mutex> guard(thread_print_lock);
+                       std::cerr << str();
+               }
+       };
+
+#else
+       struct print_guard {};
+
+       template <typename T> constexpr print_guard operator<<(const 
print_guard dummy, T &&value)
+       {
+               return dummy;
+       }
+
+       constexpr print_guard operator<<(const print_guard &dummy, std::ostream 
&(*f)(std::ostream &))
+       {
+               return dummy;
+       }
+
+#endif
+} // namespace mtx_log
+
+class shmmutex {
+       pthread_mutex_t mutex;
+
+    public:
+       shmmutex()
+       {
+               pthread_mutexattr_t attr;
+               pthread_mutexattr_init(&attr);
+               pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+               pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
+               pthread_mutex_init(&mutex, &attr);
+               pthread_mutexattr_destroy(&attr);
+       }
+
+       ~shmmutex()
+       {
+               pthread_mutex_destroy(&mutex);
+       }
+
+       void lock()
+       {
+               pthread_mutex_lock(&mutex);
+       }
+
+       bool try_lock()
+       {
+               return pthread_mutex_trylock(&mutex);
+       }
+
+       void unlock()
+       {
+               pthread_mutex_unlock(&mutex);
+       }
+
+       pthread_mutex_t *p()
+       {
+               return &mutex;
+       }
+       shmmutex(const shmmutex &) = delete;
+       shmmutex &operator=(const shmmutex &) = delete;
+};
+
+class shmcond {
+       pthread_cond_t cond;
+
+    public:
+       shmcond()
+       {
+               pthread_condattr_t attr;
+               pthread_condattr_init(&attr);
+               pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+               pthread_cond_init(&cond, &attr);
+               pthread_condattr_destroy(&attr);
+       }
+
+       ~shmcond()
+       {
+               pthread_cond_destroy(&cond);
+       }
+
+       void wait(shmmutex *lock)
+       {
+               pthread_cond_wait(&cond, lock->p());
+       }
+
+       void signal()
+       {
+               pthread_cond_signal(&cond);
+       }
+
+       void signal_all()
+       {
+               pthread_cond_broadcast(&cond);
+       }
+       shmcond(const shmcond &) = delete;
+       shmcond &operator=(const shmcond &) = delete;
+};
+
+class signal_guard {
+       shmmutex &m;
+       shmcond &s;
+
+    public:
+       signal_guard() = delete;
+       explicit signal_guard(shmmutex &m, shmcond &wait_for, shmcond 
&to_signal) : m(m), s(to_signal)
+       {
+               m.lock();
+               wait_for.wait(&m);
+       }
+       ~signal_guard()
+       {
+               s.signal();
+               m.unlock();
+       }
+       signal_guard(const signal_guard &) = delete;
+       signal_guard &operator=(const signal_guard &) = delete;
+};
+
+class mutex_guard {
+       shmmutex &m;
+
+    public:
+       mutex_guard() = delete;
+       explicit mutex_guard(shmmutex &m) : m(m)
+       {
+               m.lock();
+       }
+       ~mutex_guard()
+       {
+               m.unlock();
+       }
+       mutex_guard(const mutex_guard &) = delete;
+       mutex_guard &operator=(const mutex_guard &) = delete;
+};
+
+class sema {
+       std::atomic<int> value;
+       shmmutex m;
+       shmcond c;
+
+    public:
+       sema() : value(0)
+       {
+       }
+       explicit sema(int v) : value(v)
+       {
+       }
+
+       void wait()
+       {
+               wait(1);
+       }
+       void wait(int v)
+       {
+               mtx_log::print_guard() << __FUNCTION__ << value << std::endl;
+               mutex_guard g(m);
+               assert(value <= v);
+               while (value != v)
+                       c.wait(&m);
+       }
+       void wait_and_reset()
+       {
+               wait_and_reset(1);
+       }
+       void wait_and_reset(int v)
+       {
+               mtx_log::print_guard() << __FUNCTION__ << value << std::endl;
+               mutex_guard g(m);
+               assert(value <= v);
+               while (value != v)
+                       c.wait(&m);
+               value = 0;
+       }
+       void set()
+       {
+               set(1);
+       }
+       void set(int v)
+       {
+               mtx_log::print_guard() << __FUNCTION__ << value << std::endl;
+               mutex_guard g(m);
+               value = v;
+               c.signal();
+       }
+       void reset_unsafe()
+       {
+               value = 0;
+       }
+       bool check_unsafe(int v)
+       {
+               return value == v;
+       }
+       sema(const sema &) = delete;
+       sema &operator=(const sema &) = delete;
+};
+
+class sema_wait_guard {
+       sema &a;
+       sema &b;
+
+    public:
+       sema_wait_guard() = delete;
+       explicit sema_wait_guard(sema &wait, sema &signal) : a(wait), b(signal)
+       {
+               a.wait_and_reset(1);
+       }
+       ~sema_wait_guard()
+       {
+               b.set(1);
+       }
+       sema_wait_guard(const sema_wait_guard &) = delete;
+       sema_wait_guard &operator=(const sema_wait_guard &) = delete;
+};
+
+class sema_signal_guard {
+       sema &a;
+       sema &b;
+
+    public:
+       sema_signal_guard() = delete;
+       explicit sema_signal_guard(sema &wait, sema &signal) : a(wait), 
b(signal)
+       {
+               a.wait_and_reset(1);
+       }
+       ~sema_signal_guard()
+       {
+               b.set(1);
+       }
+       sema_signal_guard(const sema_signal_guard &) = delete;
+       sema_signal_guard &operator=(const sema_signal_guard &) = delete;
+};
+
+template <typename IFT> class shm {
+       char shmname[512];
+       size_t IFT_sz = sizeof(IFT);
+       IFT *shmptr;
+       bool good;
+       int ipc_shm_setup(const char *shm_name)
+       {
+               int fd;
+               int rc;
+               void *ptr;
+
+               if ((fd = shm_open(shm_name, O_CREAT | O_RDWR | O_TRUNC, 
S_IRUSR | S_IWUSR)) < 0) {
+                       rc = -errno;
+                       return rc;
+               }
+
+               if (ftruncate(fd, IFT_sz) < 0) {
+                       rc = -errno;
+                       shm_unlink(shm_name);
+                       ::close(fd);
+               }
+
+               if ((ptr = mmap(NULL, IFT_sz, PROT_READ | PROT_WRITE, 
MAP_SHARED, fd, 0)) == MAP_FAILED) {
+                       rc = -errno;
+                       shm_unlink(shm_name);
+                       ::close(fd);
+               }
+
+               shmptr = new (ptr) IFT(); //static_cast<IFT *>(ptr);
+               ::close(fd);
+               return 0;
+       }
+
+       int ipc_shm_connect(const char *shm_name)
+       {
+               int fd;
+               int rc;
+               void *ptr;
+
+               if ((fd = shm_open(shm_name, O_CREAT | O_RDWR, S_IRUSR | 
S_IWUSR)) < 0) {
+                       rc = -errno;
+                       return rc;
+               }
+
+               struct stat shm_stat;
+               if (fstat(fd, &shm_stat) < 0) {
+                       rc = -errno;
+                       shm_unlink(shm_name);
+                       ::close(fd);
+               }
+
+               if ((ptr = mmap(NULL, shm_stat.st_size, PROT_READ | PROT_WRITE, 
MAP_SHARED, fd, 0)) == MAP_FAILED) {
+                       rc = -errno;
+                       shm_unlink(shm_name);
+                       ::close(fd);
+               }
+
+               shmptr = static_cast<IFT *>(ptr);
+               ::close(fd);
+               return 0;
+       }
+
+    public:
+       using IFT_t = IFT;
+       explicit shm(const char *name) : good(false)
+       {
+               strncpy((char *)shmname, name, 512);
+       }
+       void create()
+       {
+               if (ipc_shm_setup(shmname) == 0)
+                       good = true;
+       }
+       void open()
+       {
+               if (ipc_shm_connect(shmname) == 0)
+                       good = true;
+       }
+       bool isgood() const
+       {
+               return good;
+       }
+       void close()
+       {
+               if (isgood())
+                       shm_unlink(shmname);
+       }
+       IFT *p()
+       {
+               return shmptr;
+       }
+};
+
+} // namespace shm

--
To view, visit https://gerrit.osmocom.org/c/osmo-trx/+/30414
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: osmo-trx
Gerrit-Branch: master
Gerrit-Change-Id: I8f582c7c06fed8d1dcc5ea52472a97dc313fdde5
Gerrit-Change-Number: 30414
Gerrit-PatchSet: 1
Gerrit-Owner: Hoernchen <ew...@sysmocom.de>
Gerrit-MessageType: newchange

Reply via email to