Marton, Thanks for reviewing this patch. On Sat, 24. Aug 19:33, Marton Balint wrote: > > > On Fri, 23 Aug 2019, Andriy Gelman wrote: > > > On Mon, 19. Aug 17:28, Andriy Gelman wrote: > > > Minor changes in v3: 1. Removed tab character from as per feedback > > > 2. Removed unused timeout variable from ZMQContext > > > > > > Andriy > > > > > From 66c11c12fcfa8a7fbb3c8c09d23c017992229a99 Mon Sep 17 00:00:00 2001 > > > From: Andriy Gelman <andriy.gel...@gmail.com> > > > Date: Tue, 30 Jul 2019 14:39:32 -0400 > > > Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option > > > > > > Currently multiple clients are only supported by using a multicast > > > destination address. An alternative is to stream to a server which > > > re-distributes the content. This commit adds ZeroMQ as a protocol > > > option, which allows multiple clients to connect to a single ffmpeg > > > instance. > > > --- > > > configure | 2 + > > > doc/general.texi | 1 + > > > doc/protocols.texi | 32 ++++++++ > > > libavformat/Makefile | 1 + > > > libavformat/libzmq.c | 158 ++++++++++++++++++++++++++++++++++++++++ > > > libavformat/protocols.c | 1 + > > > 6 files changed, 195 insertions(+) > > > create mode 100644 libavformat/libzmq.c > > Missing changelog entry and libavformat minor version bump.
ok, will update. > > > > > > > diff --git a/configure b/configure > > > index c09c842809..a4134024c2 100755 > > > --- a/configure > > > +++ b/configure > > > @@ -3411,6 +3411,8 @@ libsrt_protocol_deps="libsrt" > > > libsrt_protocol_select="network" > > > libssh_protocol_deps="libssh" > > > libtls_conflict="openssl gnutls mbedtls" > > > +libzmq_protocol_deps="libzmq" > > > +libzmq_protocol_select="network" > > You may want to enforce a minimum version requirement for libzmq in the > pkg_config part of configure depending on the API you use. ok > > > > > > > # filters > > > afftdn_filter_deps="avcodec" > > > diff --git a/doc/general.texi b/doc/general.texi > > > index 3c0c803449..b8e063268c 100644 > > > --- a/doc/general.texi > > > +++ b/doc/general.texi > > > @@ -1329,6 +1329,7 @@ performance on systems without hardware floating > > > point support). > > > @item TCP @tab X > > > @item TLS @tab X > > > @item UDP @tab X > > > +@item ZMQ @tab E > > > @end multitable > > > > > > @code{X} means that the protocol is supported. > > > diff --git a/doc/protocols.texi b/doc/protocols.texi > > > index 3e4e7af3d4..174eaacd0f 100644 > > > --- a/doc/protocols.texi > > > +++ b/doc/protocols.texi > > > @@ -1728,4 +1728,36 @@ Timeout in ms. > > > Create the Unix socket in listening mode. > > > @end table > > > > > > +@section libzmq > > > + > > > +ZeroMQ asynchronous messaging library. > > > + > > > +This library supports unicast streaming to multiple clients without > > > relying on > > > +an external server. > > > + > > > +The required syntax for streaming or connecting to a stream is: > > > +@example > > > +zmq:tcp://ip-address:port > > > +@end example > > > + > > > +Example: > > > +Create a localhost stream on port 5555: > > > +@example > > > +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555 > > > +@end example > > > + > > > +Multiple clients may connect to the stream using: > > > +@example > > > +ffplay zmq:tcp://127.0.0.1:5555 > > > +@end example > > > + > > > +Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern. > > > +The server binds to a port and publishes data. Clients connect to the > > > +server (IP address/port) and subscribe to the stream. The order in which > > > +the server and client start generally does not matter. > > > + > > > +ffmpeg must be compiled with the --enable-libzmq option to support > > > +this protocol option. See the compilation guide > > > @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu} > > > +for an example on how this option may be set. > > I think I'd rather not reference the compilation guide, as there are no > specific instructions there to compile with libzmq. If you insist, then at > least loose the "Ubuntu" part from the link. I'll remove the reference to the compilation guide. > > > > + > > > @c man end PROTOCOLS > > > diff --git a/libavformat/Makefile b/libavformat/Makefile > > > index a434b005a4..efa3a112ae 100644 > > > --- a/libavformat/Makefile > > > +++ b/libavformat/Makefile > > > @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL) += librtmp.o > > > OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL) += libsmbclient.o > > > OBJS-$(CONFIG_LIBSRT_PROTOCOL) += libsrt.o > > > OBJS-$(CONFIG_LIBSSH_PROTOCOL) += libssh.o > > > +OBJS-$(CONFIG_LIBZMQ_PROTOCOL) += libzmq.o > > > > > > # libavdevice dependencies > > > OBJS-$(CONFIG_IEC61883_INDEV) += dv.o > > > diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c > > > new file mode 100644 > > > index 0000000000..ac35c01cf8 > > > --- /dev/null > > > +++ b/libavformat/libzmq.c > > > @@ -0,0 +1,158 @@ > > > +/* > > > + * ZMQ URLProtocol > > > + * > > > + * This file is part of FFmpeg. > > > + * > > > + * FFmpeg is free software; you can redistribute it and/or > > > + * modify it under the terms of the GNU Lesser General Public > > > + * License as published by the Free Software Foundation; either > > > + * version 2.1 of the License, or (at your option) any later version. > > > + * > > > + * FFmpeg is distributed in the hope that it will be useful, > > > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > > > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > > > + * Lesser General Public License for more details. > > > + * > > > + * You should have received a copy of the GNU Lesser General Public > > > + * License along with FFmpeg; if not, write to the Free Software > > > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA > > > 02110-1301 USA > > > + */ > > > + > > > +#include <zmq.h> > > > +#include "url.h" > > > +#include "network.h" > > > +#include "libavutil/avstring.h" > > > +#include "libavutil/opt.h" > > > + > > > +typedef struct ZMQContext { > > > + const AVClass *class; > > > + void *context; > > > + void *socket; > > > +} ZMQContext; > > > + > > > +static const AVOption options[] = { > > > + { NULL } > > > +}; > > > + > > > +static int ff_zmq_open(URLContext *h, const char *uri, int flags) > > > +{ > > > + int ret; > > > + ZMQContext *s = h->priv_data; > > > + s->context = zmq_ctx_new(); > > You should check if the context creation was successful. thanks, I'll add the check. > > > > + h->is_streamed = 1; > > > + > > > + av_strstart(uri, "zmq:", &uri); > > > + > > > + /*publish during write*/ > > > + if (h->flags & AVIO_FLAG_WRITE) { > > > + s->socket = zmq_socket(s->context, ZMQ_PUB); > > > + if (!s->socket) { > > > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): > > > %s\n", zmq_strerror(errno)); > > zmq_errno() instead of errno? Same goes for all similar cases. The documentation says to use zmq_errno() on non-POSIX systems. But also suggests: "Users not experiencing issues with retrieving the correct value of errno should not use this function and should instead access the errno variable directly." On IRC J_Darnley pointed out that ffmpeg.c includes errno.h without any checks. It seems reasonable to assume that errno is available. > > > > + zmq_ctx_destroy(s->context); > > > + return AVERROR_EXTERNAL; > > > + } > > > + > > > + ret = zmq_bind(s->socket, uri); > > > + if (ret == -1) { > > > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): > > > %s\n", zmq_strerror(errno)); > > > + zmq_close(s->socket); > > > + zmq_ctx_destroy(s->context); > > > + return AVERROR_EXTERNAL; > > > + } > > > + } > > > + > > > + /*subscribe for read*/ > > > + if (h->flags & AVIO_FLAG_READ) { > > > + s->socket = zmq_socket(s->context, ZMQ_SUB); > > > + if (!s->socket) { > > > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): > > > %s\n", zmq_strerror(errno)); > > > + zmq_ctx_destroy(s->context); > > > + return AVERROR_EXTERNAL; > > > + } > > > + > > > + zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0); > > > + ret = zmq_connect(s->socket, uri); > > > + if (ret == -1) { > > > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): > > > %s\n", zmq_strerror(errno)); > > > + zmq_close(s->socket); > > > + zmq_ctx_destroy(s->context); > > > + return AVERROR_EXTERNAL; > > > + } > > > + } > > > + return 0; > > > +} > > > + > > > +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int > > > size) > > > +{ > > > + int ret; > > > + ZMQContext *s = h->priv_data; > > > + > > > + ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT); > > I can see that you are using non-blocking mode. A polling with timeout > approach is preferred, see how tcp or unix does it. I used polling in the initial patch, but I switched to non-blocking because I thought it was a cleaner solution. I'll revert to polling with a timeout in the next version. > > > > + if (ret >= 0) > > > + return ret; /*number of sent bytes*/ > > > + > > > + /*errno = EAGAIN if messages cannot be pushed*/ > > > + if (ret == -1 && errno == EAGAIN) { > > > + return AVERROR(EAGAIN); > > > + } else > > > + return AVERROR_EXTERNAL; > > > +} > > > + > > > +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size) > > > +{ > > > + int ret; > > > + ZMQContext *s = h->priv_data; > > > + zmq_msg_t msg; > > > + int msg_size; > > > + > > > + ret = zmq_msg_init(&msg); > > > + if (ret == -1) { > > > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): > > > %s\n", zmq_strerror(errno)); > > > + return AVERROR_EXTERNAL; > > > + } > > > + > > > + ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT); > > Same here, a polling with timeout is preferred. > > > > + if (ret == -1) { > > > + ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL; > > > + if (ret == AVERROR_EXTERNAL) > > > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): > > > %s\n", zmq_strerror(errno)); > > identation > > > > + goto finish; > > > + } > > > + > > > + msg_size = zmq_msg_size(&msg); > > > + if (msg_size > size) { > > > + msg_size = size; > > > + av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space > > > in the buffer. Message will be truncated\n"); > > Probably a user settable pkt_size option would be useful which sets the > URLContext max_packet_size which basically controls the size of the > allocated IO buffer. > > > > + } > > > + memcpy(buf, zmq_msg_data(&msg), msg_size); > > If you are truncating anyway then please use zmq_recv directly, so you can > avoid the memcpy. yes, good idea on setting pkt_size and avoiding memcpy. > > > > + > > > +finish: > > > + zmq_msg_close(&msg); > > > + return ret; > > > +} > > > + > > > +static int ff_zmq_close(URLContext *h) > > > +{ > > > + ZMQContext *s = h->priv_data; > > > + zmq_close(s->socket); > > > + zmq_ctx_destroy(s->context); > > > + return 0; > > > +} > > > + > > > +static const AVClass zmq_context_class = { > > > + .class_name = "zmq", > > > + .item_name = av_default_item_name, > > > + .option = options, > > > + .version = LIBAVUTIL_VERSION_INT, > > > +}; > > > + > > > +const URLProtocol ff_libzmq_protocol = { > > > + .name = "zmq", > > > + .url_close = ff_zmq_close, > > > + .url_open = ff_zmq_open, > > > + .url_read = ff_zmq_read, > > > + .url_write = ff_zmq_write, > > > + .priv_data_size = sizeof(ZMQContext), > > > + .priv_data_class = &zmq_context_class, > > > + .flags = URL_PROTOCOL_FLAG_NETWORK, > > > +}; > > > diff --git a/libavformat/protocols.c b/libavformat/protocols.c > > > index ad95659795..face5b29b5 100644 > > > --- a/libavformat/protocols.c > > > +++ b/libavformat/protocols.c > > > @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol; > > > extern const URLProtocol ff_libsrt_protocol; > > > extern const URLProtocol ff_libssh_protocol; > > > extern const URLProtocol ff_libsmbclient_protocol; > > > +extern const URLProtocol ff_libzmq_protocol; > > > > > > #include "libavformat/protocol_list.c" > > > > > > -- > > > 2.22.0 > > > > > > > ping > > Regards, > Marton Thanks, Andriy _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".