On Fri, 23 Aug 2019, Andriy Gelman wrote:

On Mon, 19. Aug 17:28, Andriy Gelman wrote:
Minor changes in v3: 1. Removed tab character from as per feedback 2. Removed unused timeout variable from ZMQContext
Andriy

From 66c11c12fcfa8a7fbb3c8c09d23c017992229a99 Mon Sep 17 00:00:00 2001
From: Andriy Gelman <andriy.gel...@gmail.com>
Date: Tue, 30 Jul 2019 14:39:32 -0400
Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option

Currently multiple clients are only supported by using a multicast
destination address. An alternative is to stream to a server which
re-distributes the content. This commit adds ZeroMQ as a protocol
option, which allows multiple clients to connect to a single ffmpeg
instance.
---
 configure               |   2 +
 doc/general.texi        |   1 +
 doc/protocols.texi      |  32 ++++++++
 libavformat/Makefile    |   1 +
 libavformat/libzmq.c    | 158 ++++++++++++++++++++++++++++++++++++++++
 libavformat/protocols.c |   1 +
 6 files changed, 195 insertions(+)
 create mode 100644 libavformat/libzmq.c

Missing changelog entry and libavformat minor version bump.


diff --git a/configure b/configure
index c09c842809..a4134024c2 100755
--- a/configure
+++ b/configure
@@ -3411,6 +3411,8 @@ libsrt_protocol_deps="libsrt"
 libsrt_protocol_select="network"
 libssh_protocol_deps="libssh"
 libtls_conflict="openssl gnutls mbedtls"
+libzmq_protocol_deps="libzmq"
+libzmq_protocol_select="network"

You may want to enforce a minimum version requirement for libzmq in the pkg_config part of configure depending on the API you use.


 # filters
 afftdn_filter_deps="avcodec"
diff --git a/doc/general.texi b/doc/general.texi
index 3c0c803449..b8e063268c 100644
--- a/doc/general.texi
+++ b/doc/general.texi
@@ -1329,6 +1329,7 @@ performance on systems without hardware floating point 
support).
 @item TCP          @tab X
 @item TLS          @tab X
 @item UDP          @tab X
+@item ZMQ          @tab E
 @end multitable

 @code{X} means that the protocol is supported.
diff --git a/doc/protocols.texi b/doc/protocols.texi
index 3e4e7af3d4..174eaacd0f 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -1728,4 +1728,36 @@ Timeout in ms.
 Create the Unix socket in listening mode.
 @end table

+@section libzmq
+
+ZeroMQ asynchronous messaging library.
+
+This library supports unicast streaming to multiple clients without relying on
+an external server.
+
+The required syntax for streaming or connecting to a stream is:
+@example
+zmq:tcp://ip-address:port
+@end example
+
+Example:
+Create a localhost stream on port 5555:
+@example
+ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
+@end example
+
+Multiple clients may connect to the stream using:
+@example
+ffplay zmq:tcp://127.0.0.1:5555
+@end example
+
+Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern.
+The server binds to a port and publishes data. Clients connect to the
+server (IP address/port) and subscribe to the stream. The order in which
+the server and client start generally does not matter.
+
+ffmpeg must be compiled with the --enable-libzmq option to support
+this protocol option. See the compilation guide 
@url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
+for an example on how this option may be set.

I think I'd rather not reference the compilation guide, as there are no specific instructions there to compile with libzmq. If you insist, then at least loose the "Ubuntu" part from the link.

+
 @c man end PROTOCOLS
diff --git a/libavformat/Makefile b/libavformat/Makefile
index a434b005a4..efa3a112ae 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
 OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
 OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
 OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
+OBJS-$(CONFIG_LIBZMQ_PROTOCOL)           += libzmq.o

 # libavdevice dependencies
 OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
new file mode 100644
index 0000000000..ac35c01cf8
--- /dev/null
+++ b/libavformat/libzmq.c
@@ -0,0 +1,158 @@
+/*
+ * ZMQ URLProtocol
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <zmq.h>
+#include "url.h"
+#include "network.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+
+typedef struct ZMQContext {
+    const AVClass *class;
+    void *context;
+    void *socket;
+} ZMQContext;
+
+static const AVOption options[] = {
+    { NULL }
+};
+
+static int ff_zmq_open(URLContext *h, const char *uri, int flags)
+{
+    int ret;
+    ZMQContext *s   = h->priv_data;
+    s->context      = zmq_ctx_new();

You should check if the context creation was successful.

+    h->is_streamed  = 1;
+
+    av_strstart(uri, "zmq:", &uri);
+
+    /*publish during write*/
+    if (h->flags & AVIO_FLAG_WRITE) {
+        s->socket = zmq_socket(s->context, ZMQ_PUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", 
zmq_strerror(errno));

zmq_errno() instead of errno? Same goes for all similar cases.

+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        ret = zmq_bind(s->socket, uri);
+        if (ret == -1) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", 
zmq_strerror(errno));
+            zmq_close(s->socket);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+
+    /*subscribe for read*/
+    if (h->flags & AVIO_FLAG_READ) {
+        s->socket = zmq_socket(s->context, ZMQ_SUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", 
zmq_strerror(errno));
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
+        ret = zmq_connect(s->socket, uri);
+        if (ret == -1) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): 
%s\n", zmq_strerror(errno));
+            zmq_close(s->socket);
+            zmq_ctx_destroy(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+    return 0;
+}
+
+static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+
+    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);

I can see that you are using non-blocking mode. A polling with timeout approach is preferred, see how tcp or unix does it.

+    if (ret >= 0)
+        return ret; /*number of sent bytes*/
+
+    /*errno = EAGAIN if messages cannot be pushed*/
+    if (ret == -1 && errno == EAGAIN) {
+        return AVERROR(EAGAIN);
+    } else
+        return AVERROR_EXTERNAL;
+}
+
+static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+    zmq_msg_t msg;
+    int msg_size;
+
+    ret = zmq_msg_init(&msg);
+    if (ret == -1) {
+      av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", 
zmq_strerror(errno));
+      return AVERROR_EXTERNAL;
+    }
+
+    ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT);

Same here, a polling with timeout is preferred.

+    if (ret == -1) {
+        ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL;
+        if (ret == AVERROR_EXTERNAL)
+          av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): %s\n", 
zmq_strerror(errno));

identation

+        goto finish;
+    }
+
+    msg_size = zmq_msg_size(&msg);
+    if (msg_size > size) {
+        msg_size = size;
+        av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the 
buffer. Message will be truncated\n");

Probably a user settable pkt_size option would be useful which sets the URLContext max_packet_size which basically controls the size of the allocated IO buffer.

+    }
+    memcpy(buf, zmq_msg_data(&msg), msg_size);

If you are truncating anyway then please use zmq_recv directly, so you can avoid the memcpy.

+
+finish:
+    zmq_msg_close(&msg);
+    return ret;
+}
+
+static int ff_zmq_close(URLContext *h)
+{
+    ZMQContext *s = h->priv_data;
+    zmq_close(s->socket);
+    zmq_ctx_destroy(s->context);
+    return 0;
+}
+
+static const AVClass zmq_context_class = {
+    .class_name = "zmq",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libzmq_protocol = {
+    .name            = "zmq",
+    .url_close       = ff_zmq_close,
+    .url_open        = ff_zmq_open,
+    .url_read        = ff_zmq_read,
+    .url_write       = ff_zmq_write,
+    .priv_data_size  = sizeof(ZMQContext),
+    .priv_data_class = &zmq_context_class,
+    .flags           = URL_PROTOCOL_FLAG_NETWORK,
+};
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index ad95659795..face5b29b5 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
 extern const URLProtocol ff_libsrt_protocol;
 extern const URLProtocol ff_libssh_protocol;
 extern const URLProtocol ff_libsmbclient_protocol;
+extern const URLProtocol ff_libzmq_protocol;

 #include "libavformat/protocol_list.c"

--
2.22.0


ping

Regards,
Marton
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".

Reply via email to