Re: [FFmpeg-devel] [PATCH 2/3] librist: allow use of circular buffer for receiving.

2021-09-28 Thread Marton Balint




On Tue, 28 Sep 2021, Gijs Peskens wrote:


libRIST internally stores packets in a fifo of 1024 packets, overwriting
old packets when not read in a sufficient pace. Unfortunately this results
in many fifo overflow errors when ffmpeg consumes a libRIST stream.
This patch creates a receiver thread based on the UDP circular buffer code.


The user is better off adjusting the libRIST fifo size, so this patch adds 
extra complexity for no good reason.


Regards,
Marton



Signed-off-by: Gijs Peskens 
---
libavformat/librist.c | 201 --
1 file changed, 196 insertions(+), 5 deletions(-)

diff --git a/libavformat/librist.c b/libavformat/librist.c
index b120346f48..47c01a8432 100644
--- a/libavformat/librist.c
+++ b/libavformat/librist.c
@@ -26,6 +26,8 @@
#include "libavutil/opt.h"
#include "libavutil/parseutils.h"
#include "libavutil/time.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"

#include "avformat.h"
#include "internal.h"
@@ -33,6 +35,15 @@
#include "os_support.h"
#include "url.h"

+#if HAVE_W32THREADS
+#undef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 1
+#endif
+
+#if HAVE_PTHREAD_CANCEL
+#include "libavutil/thread.h"
+#endif
+
#include 
#include 
// RIST_MAX_PACKET_SIZE - 28 minimum protocol overhead
@@ -67,6 +78,19 @@ typedef struct RISTContext {

struct rist_peer *peer;
struct rist_ctx *ctx;
+
+int circular_buffer_size;
+AVFifoBuffer *fifo;
+int circular_buffer_error;
+int overrun_nonfatal;
+
+#if HAVE_PTHREAD_CANCEL
+pthread_t receiver_thread;
+pthread_mutex_t mutex;
+pthread_cond_t cond;
+int thread_started;
+int thread_stop;
+#endif
} RISTContext;

#define D AV_OPT_FLAG_DECODING_PARAM
@@ -82,6 +106,8 @@ static const AVOption librist_options[] = {
{ "log_level",   "set loglevel",OFFSET(log_level),   AV_OPT_TYPE_INT,   
{.i64=RIST_LOG_INFO},-1, INT_MAX, .flags = D|E },
{ "secret", "set encryption secret",OFFSET(secret),  
AV_OPT_TYPE_STRING,{.str=NULL},  0, 0,   .flags = D|E },
{ "encryption","set encryption type",OFFSET(encryption), AV_OPT_TYPE_INT   
,{.i64=0}, 0, INT_MAX, .flags = D|E },
+{ "fifo_size",  "set the receiving circular buffer size, expressed as a 
number of packets with size of 188 bytes, 0 to disable", OFFSET(circular_buffer_size), 
AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
+{ "overrun_nonfatal", "survive in case of receiving circular buffer 
overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,D },
{ NULL }
};

@@ -119,6 +145,15 @@ static int librist_close(URLContext *h)
RISTContext *s = h->priv_data;
int ret = 0;

+#if HAVE_PTHREAD_CANCEL
+if (s->thread_started) {
+pthread_mutex_lock(>mutex);
+s->thread_stop = 1;
+pthread_mutex_unlock(>mutex);
+pthread_join(s->receiver_thread, NULL);
+}
+#endif
+av_fifo_freep(>fifo);
s->peer = NULL;

if (s->ctx)
@@ -128,6 +163,78 @@ static int librist_close(URLContext *h)
return risterr2ret(ret);
}

+static void *receiver_thread(void *_url_context)
+{
+URLContext *h = _url_context;
+RISTContext *s = h->priv_data;
+int ret;
+uint8_t tmp[4];
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+const struct rist_data_block *data_block;
+#else
+struct rist_data_block *data_block;
+#endif
+
+while (1)
+{
+pthread_mutex_lock(>mutex);
+if (s->thread_stop)
+break;
+pthread_mutex_unlock(>mutex);
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+ret = rist_receiver_data_read(s->ctx, _block, POLLING_TIME);
+#else
+ret = rist_receiver_data_read2(s->ctx, _block, POLLING_TIME);
+#endif
+if (ret == 0)
+continue;
+
+pthread_mutex_lock(>mutex);
+if (ret < 0) {
+s->circular_buffer_error = ret;
+break;
+}
+
+if (data_block->payload_len > MAX_PAYLOAD_SIZE) {
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+rist_receiver_data_block_free((struct 
rist_data_block**)_block);
+#else
+rist_receiver_data_block_free2(_block);
+#endif
+s->circular_buffer_error = AVERROR_EXTERNAL;
+break;
+}
+AV_WL32(tmp, data_block->payload_len);
+if (av_fifo_space(s->fifo) < (data_block->payload_len +4))
+{
+/* No Space left */
+if (s->overrun_nonfatal) {
+av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
+"Surviving due to overrun_nonfatal option\n");
+continue;
+} else {
+av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
+"To avoid, increase fifo_size URL option. "
+"To survive in such case, use overrun_nonfatal 
option\n");
+s->circular_buffer_error = AVERROR(EIO);
+break;
+ 

[FFmpeg-devel] [PATCH 2/3] librist: allow use of circular buffer for receiving.

2021-09-28 Thread Gijs Peskens
libRIST internally stores packets in a fifo of 1024 packets, overwriting
old packets when not read in a sufficient pace. Unfortunately this results
in many fifo overflow errors when ffmpeg consumes a libRIST stream.
This patch creates a receiver thread based on the UDP circular buffer code.

Signed-off-by: Gijs Peskens 
---
 libavformat/librist.c | 201 --
 1 file changed, 196 insertions(+), 5 deletions(-)

diff --git a/libavformat/librist.c b/libavformat/librist.c
index b120346f48..47c01a8432 100644
--- a/libavformat/librist.c
+++ b/libavformat/librist.c
@@ -26,6 +26,8 @@
 #include "libavutil/opt.h"
 #include "libavutil/parseutils.h"
 #include "libavutil/time.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"
 
 #include "avformat.h"
 #include "internal.h"
@@ -33,6 +35,15 @@
 #include "os_support.h"
 #include "url.h"
 
+#if HAVE_W32THREADS
+#undef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 1
+#endif
+
+#if HAVE_PTHREAD_CANCEL
+#include "libavutil/thread.h"
+#endif
+
 #include 
 #include 
 // RIST_MAX_PACKET_SIZE - 28 minimum protocol overhead
@@ -67,6 +78,19 @@ typedef struct RISTContext {
 
 struct rist_peer *peer;
 struct rist_ctx *ctx;
+
+int circular_buffer_size;
+AVFifoBuffer *fifo;
+int circular_buffer_error;
+int overrun_nonfatal;
+
+#if HAVE_PTHREAD_CANCEL
+pthread_t receiver_thread;
+pthread_mutex_t mutex;
+pthread_cond_t cond;
+int thread_started;
+int thread_stop;
+#endif
 } RISTContext;
 
 #define D AV_OPT_FLAG_DECODING_PARAM
@@ -82,6 +106,8 @@ static const AVOption librist_options[] = {
 { "log_level",   "set loglevel",OFFSET(log_level),   AV_OPT_TYPE_INT,  
 {.i64=RIST_LOG_INFO},-1, INT_MAX, .flags = D|E },
 { "secret", "set encryption secret",OFFSET(secret),  
AV_OPT_TYPE_STRING,{.str=NULL},  0, 0,   .flags = D|E },
 { "encryption","set encryption type",OFFSET(encryption), AV_OPT_TYPE_INT   
,{.i64=0}, 0, INT_MAX, .flags = D|E },
+{ "fifo_size",  "set the receiving circular buffer size, expressed as 
a number of packets with size of 188 bytes, 0 to disable", 
OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
+{ "overrun_nonfatal", "survive in case of receiving circular buffer 
overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,D },
 { NULL }
 };
 
@@ -119,6 +145,15 @@ static int librist_close(URLContext *h)
 RISTContext *s = h->priv_data;
 int ret = 0;
 
+#if HAVE_PTHREAD_CANCEL
+if (s->thread_started) {
+pthread_mutex_lock(>mutex);
+s->thread_stop = 1;
+pthread_mutex_unlock(>mutex);
+pthread_join(s->receiver_thread, NULL);
+}
+#endif
+av_fifo_freep(>fifo);
 s->peer = NULL;
 
 if (s->ctx)
@@ -128,6 +163,78 @@ static int librist_close(URLContext *h)
 return risterr2ret(ret);
 }
 
+static void *receiver_thread(void *_url_context)
+{
+URLContext *h = _url_context;
+RISTContext *s = h->priv_data;
+int ret;
+uint8_t tmp[4];
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+const struct rist_data_block *data_block;
+#else
+struct rist_data_block *data_block;
+#endif
+
+while (1)
+{
+pthread_mutex_lock(>mutex);
+if (s->thread_stop)
+break;
+pthread_mutex_unlock(>mutex);
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+ret = rist_receiver_data_read(s->ctx, _block, POLLING_TIME);
+#else
+ret = rist_receiver_data_read2(s->ctx, _block, POLLING_TIME);
+#endif
+if (ret == 0)
+continue;
+
+pthread_mutex_lock(>mutex);
+if (ret < 0) {
+s->circular_buffer_error = ret;
+break;
+}
+
+if (data_block->payload_len > MAX_PAYLOAD_SIZE) {
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+rist_receiver_data_block_free((struct 
rist_data_block**)_block);
+#else
+rist_receiver_data_block_free2(_block);
+#endif
+s->circular_buffer_error = AVERROR_EXTERNAL;
+break;
+}
+AV_WL32(tmp, data_block->payload_len);
+if (av_fifo_space(s->fifo) < (data_block->payload_len +4))
+{
+/* No Space left */
+if (s->overrun_nonfatal) {
+av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
+"Surviving due to overrun_nonfatal option\n");
+continue;
+} else {
+av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
+"To avoid, increase fifo_size URL option. "
+"To survive in such case, use overrun_nonfatal 
option\n");
+s->circular_buffer_error = AVERROR(EIO);
+break;
+}
+}
+av_fifo_generic_write(s->fifo, tmp, 4, NULL);
+av_fifo_generic_write(s->fifo, (void*)data_block->payload,