On Tue, 28 Sep 2021, Gijs Peskens wrote:

libRIST internally stores packets in a fifo of 1024 packets, overwriting
old packets when not read in a sufficient pace. Unfortunately this results
in many fifo overflow errors when ffmpeg consumes a libRIST stream.
This patch creates a receiver thread based on the UDP circular buffer code.

The user is better off adjusting the libRIST fifo size, so this patch adds extra complexity for no good reason.

Regards,
Marton


Signed-off-by: Gijs Peskens <g...@peskens.net>
---
libavformat/librist.c | 201 ++++++++++++++++++++++++++++++++++++++++--
1 file changed, 196 insertions(+), 5 deletions(-)

diff --git a/libavformat/librist.c b/libavformat/librist.c
index b120346f48..47c01a8432 100644
--- a/libavformat/librist.c
+++ b/libavformat/librist.c
@@ -26,6 +26,8 @@
#include "libavutil/opt.h"
#include "libavutil/parseutils.h"
#include "libavutil/time.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"

#include "avformat.h"
#include "internal.h"
@@ -33,6 +35,15 @@
#include "os_support.h"
#include "url.h"

+#if HAVE_W32THREADS
+#undef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 1
+#endif
+
+#if HAVE_PTHREAD_CANCEL
+#include "libavutil/thread.h"
+#endif
+
#include <librist/librist.h>
#include <librist/version.h>
// RIST_MAX_PACKET_SIZE - 28 minimum protocol overhead
@@ -67,6 +78,19 @@ typedef struct RISTContext {

    struct rist_peer *peer;
    struct rist_ctx *ctx;
+
+    int circular_buffer_size;
+    AVFifoBuffer *fifo;
+    int circular_buffer_error;
+    int overrun_nonfatal;
+
+#if HAVE_PTHREAD_CANCEL
+    pthread_t receiver_thread;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+    int thread_started;
+    int thread_stop;
+#endif
} RISTContext;

#define D AV_OPT_FLAG_DECODING_PARAM
@@ -82,6 +106,8 @@ static const AVOption librist_options[] = {
    { "log_level",   "set loglevel",    OFFSET(log_level),   AV_OPT_TYPE_INT,   
{.i64=RIST_LOG_INFO},        -1, INT_MAX, .flags = D|E },
    { "secret", "set encryption secret",OFFSET(secret),      
AV_OPT_TYPE_STRING,{.str=NULL},                  0, 0,       .flags = D|E },
    { "encryption","set encryption type",OFFSET(encryption), AV_OPT_TYPE_INT   
,{.i64=0},                     0, INT_MAX, .flags = D|E },
+    { "fifo_size",      "set the receiving circular buffer size, expressed as a 
number of packets with size of 188 bytes, 0 to disable", OFFSET(circular_buffer_size), 
AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
+    { "overrun_nonfatal", "survive in case of receiving circular buffer 
overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    D },
    { NULL }
};

@@ -119,6 +145,15 @@ static int librist_close(URLContext *h)
    RISTContext *s = h->priv_data;
    int ret = 0;

+#if HAVE_PTHREAD_CANCEL
+    if (s->thread_started) {
+        pthread_mutex_lock(&s->mutex);
+        s->thread_stop = 1;
+        pthread_mutex_unlock(&s->mutex);
+        pthread_join(s->receiver_thread, NULL);
+    }
+#endif
+    av_fifo_freep(&s->fifo);
    s->peer = NULL;

    if (s->ctx)
@@ -128,6 +163,78 @@ static int librist_close(URLContext *h)
    return risterr2ret(ret);
}

+static void *receiver_thread(void *_url_context)
+{
+    URLContext *h = _url_context;
+    RISTContext *s = h->priv_data;
+    int ret;
+    uint8_t tmp[4];
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+    const struct rist_data_block *data_block;
+#else
+    struct rist_data_block *data_block;
+#endif
+
+    while (1)
+    {
+        pthread_mutex_lock(&s->mutex);
+        if (s->thread_stop)
+            break;
+        pthread_mutex_unlock(&s->mutex);
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+        ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME);
+#else
+        ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME);
+#endif
+        if (ret == 0)
+            continue;
+
+        pthread_mutex_lock(&s->mutex);
+        if (ret < 0) {
+            s->circular_buffer_error = ret;
+            break;
+        }
+
+        if (data_block->payload_len > MAX_PAYLOAD_SIZE) {
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+            rist_receiver_data_block_free((struct 
rist_data_block**)&data_block);
+#else
+            rist_receiver_data_block_free2(&data_block);
+#endif
+            s->circular_buffer_error = AVERROR_EXTERNAL;
+            break;
+        }
+        AV_WL32(tmp, data_block->payload_len);
+        if (av_fifo_space(s->fifo) < (data_block->payload_len +4))
+        {
+            /* No Space left */
+            if (s->overrun_nonfatal) {
+                av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
+                        "Surviving due to overrun_nonfatal option\n");
+                continue;
+            } else {
+                av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
+                        "To avoid, increase fifo_size URL option. "
+                        "To survive in such case, use overrun_nonfatal 
option\n");
+                s->circular_buffer_error = AVERROR(EIO);
+                break;
+            }
+        }
+        av_fifo_generic_write(s->fifo, tmp, 4, NULL);
+        av_fifo_generic_write(s->fifo, (void*)data_block->payload, 
data_block->payload_len, NULL);
+        pthread_mutex_unlock(&s->mutex);
+        pthread_cond_signal(&s->cond);
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+        rist_receiver_data_block_free((struct rist_data_block**)&data_block);
+#else
+        rist_receiver_data_block_free2(&data_block);
+#endif
+    }
+    pthread_mutex_unlock(&s->mutex);
+    pthread_cond_signal(&s->cond);
+    return NULL;
+}
+
static int librist_open(URLContext *h, const char *uri, int flags)
{
    RISTContext *s = h->priv_data;
@@ -194,27 +301,111 @@ static int librist_open(URLContext *h, const char *uri, 
int flags)
    if (ret < 0)
        goto err;

+    s->circular_buffer_size *= 188;
+
+#if HAVE_PTHREAD_CANCEL
+    //Create receiver thread if circular buffer size is set and we are 
receiving
+    if ((flags & AVIO_FLAG_READ) && s->circular_buffer_size > 0) {
+        /* start the task going */
+        s->fifo = av_fifo_alloc(s->circular_buffer_size);
+        if (!s->fifo) {
+            ret = AVERROR(ENOMEM);
+            goto err;
+        }
+        ret = pthread_mutex_init(&s->mutex, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", 
strerror(ret));
+            ret = AVERROR(ret);
+            goto err;
+        }
+        ret = pthread_cond_init(&s->cond, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", 
strerror(ret));
+            ret = AVERROR(ret);
+            goto cond_fail;
+        }
+        ret = pthread_create(&s->receiver_thread, NULL, receiver_thread, h);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", 
strerror(ret));
+            ret = AVERROR(ret);
+            goto thread_fail;
+        }
+        s->thread_started = 1;
+    }
+#endif
    return 0;
-
+#if HAVE_PTHREAD_CANCEL
+ thread_fail:
+    pthread_cond_destroy(&s->cond);
+ cond_fail:
+    pthread_mutex_destroy(&s->mutex);
+#endif
err:
    librist_close(h);
-
+    av_fifo_freep(&s->fifo);
    return risterr2ret(ret);
}

static int librist_read(URLContext *h, uint8_t *buf, int size)
{
    RISTContext *s = h->priv_data;
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+    const struct rist_data_block *data_block;
+#else
+    struct rist_data_block *data_block;
+#endif
    int ret;

+#if HAVE_PTHREAD_CANCEL
+    int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
+
+    if (s->fifo) {
+        pthread_mutex_lock(&s->mutex);
+        do {
+            avail = av_fifo_size(s->fifo);
+            if (avail) { // >=size) {
+                uint8_t tmp[4];
+
+                av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+                avail = AV_RL32(tmp);
+                if(avail > size){
+                    av_log(h, AV_LOG_WARNING, "Part of datagram lost due to 
insufficient buffer size\n");
+                    avail = size;
+                }
+
+                av_fifo_generic_read(s->fifo, buf, avail, NULL);
+                av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
+                pthread_mutex_unlock(&s->mutex);
+                return avail;
+            } else if(s->circular_buffer_error){
+                int err = s->circular_buffer_error;
+                pthread_mutex_unlock(&s->mutex);
+                return err;
+            } else if(nonblock) {
+                pthread_mutex_unlock(&s->mutex);
+                return AVERROR(EAGAIN);
+            } else {
+                /* FIXME: using the monotonic clock would be better,
+                   but it does not exist on all supported platforms. */
+                int64_t t = av_gettime() + 100000;
+                struct timespec tv = { .tv_sec  =  t / 1000000,
+                                       .tv_nsec = (t % 1000000) * 1000 };
+                int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
+                if (err) {
+                    pthread_mutex_unlock(&s->mutex);
+                    return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
+                }
+                nonblock = 1;
+            }
+        } while(1);
+    }
+#endif
+
#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
-    const struct rist_data_block *data_block;
    ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME);
#else
-    struct rist_data_block *data_block;
    ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME);
#endif
-
    if (ret < 0)
        return risterr2ret(ret);

--
2.30.2

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".

Reply via email to