Re: [FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
On Wed, Dec 02, 2015 at 03:13:33PM +0100, Nicolas George wrote: > Le duodi 12 frimaire, an CCXXIV, Clement Boesch a écrit : > > because concurrent read/write accesses on the fifo needs to be locked? > > No, of course not. I was talking about the fifo itself, not the threadmessage API. > > > I don't understand... > > *I* do not understand what you need, what you want to do with this API. To > discard all elements, just read them until there are no more, no need of > locking (except internally to read them, of course), no need for anything > else. So what am I missing? I'd say it prevents the sender (application side) to become a reader in order to read every message concurrently with the dedicated reader(s) in the goal of discarding them so they can't read them. In case I'm not clear, one simple scenario would be the following: - there is a large queue of packets, almost filled - there is one fast sender filling the queue (the demuxer) - there is a bunch of slow readers reading from the queue (the decoders) - the sender "receives" a seek event from the user - the sender now wants to stop as fast as possible readers (decoders) from reading all kind of packets If that sender has to lock for every message it's going to read & drop, while the decoders are still fighting to read them from the queue, it looks very ineffective to me. Hopefully this clarifies. I'm not sure what kind of similar information I could write in the doxy. If you have any suggestion... I will send a new patchset very soon (with important bug fixes and improvements) Regards, -- Clément B. signature.asc Description: PGP signature ___ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
Re: [FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
Le duodi 12 frimaire, an CCXXIV, Clement Boesch a écrit : > - there is a large queue of packets, almost filled > - there is one fast sender filling the queue (the demuxer) > - there is a bunch of slow readers reading from the queue (the decoders) > - the sender "receives" a seek event from the user > - the sender now wants to stop as fast as possible readers (decoders) >from reading all kind of packets Ok. > If that sender has to lock for every message it's going to read & drop, > while the decoders are still fighting to read them from the queue, it > looks very ineffective to me. This, especially "looks very ineffective", sounds like premature optimization. I am not against it, but then the documentation should tell it is an optimization. And conversely, if it is not only a matter of efficiency but also of correctness, the documentation must explain what the risk is. Regards, -- Nicolas George signature.asc Description: Digital signature ___ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
Re: [FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
On Tue, Dec 01, 2015 at 06:49:29PM +0100, Nicolas George wrote: > Le primidi 11 frimaire, an CCXXIV, Clement Boesch a écrit : > > Ah. Well then the user can not do it himself since he has no way of > > acquiring the queue lock (AVThreadMessageQueue is opaque) > > Yes, but why does it matter, if the elements are to be discarded anyway. > because concurrent read/write accesses on the fifo needs to be locked? I don't understand... -- Clément B. signature.asc Description: PGP signature ___ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
Re: [FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
Le duodi 12 frimaire, an CCXXIV, Clement Boesch a écrit : > because concurrent read/write accesses on the fifo needs to be locked? No, of course not. > I don't understand... *I* do not understand what you need, what you want to do with this API. To discard all elements, just read them until there are no more, no need of locking (except internally to read them, of course), no need for anything else. So what am I missing? Regards, -- Nicolas George signature.asc Description: Digital signature ___ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
Re: [FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
Le primidi 11 frimaire, an CCXXIV, Clement Boesch a écrit : > Ah. Well then the user can not do it himself since he has no way of > acquiring the queue lock (AVThreadMessageQueue is opaque) Yes, but why does it matter, if the elements are to be discarded anyway. Ah, I think I see a point: this API is for the sender, not the receiver, and you are afraid that the receiver can read one element in the middle while the sender is flushing. Is that it? Regards, -- Nicolas George signature.asc Description: Digital signature ___ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
Re: [FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
Le decadi 10 frimaire, an CCXXIV, Clement Boesch a écrit : > From: Clément Bœsch> > --- > libavutil/threadmessage.c | 37 ++--- > libavutil/threadmessage.h | 21 ++--- > 2 files changed, 52 insertions(+), 6 deletions(-) > > diff --git a/libavutil/threadmessage.c b/libavutil/threadmessage.c > index b7fcbe2..87ce8dc 100644 > --- a/libavutil/threadmessage.c > +++ b/libavutil/threadmessage.c > @@ -40,14 +40,16 @@ struct AVThreadMessageQueue { > int err_send; > int err_recv; > unsigned elsize; > +void (*free_func)(void *msg); > #else > int dummy; > #endif > }; > > -int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, > - unsigned nelem, > - unsigned elsize) > +int av_thread_message_queue_alloc2(AVThreadMessageQueue **mq, > + unsigned nelem, > + unsigned elsize, > + void (*free_func)(void *msg)) I must say I am not very fond of this aspect of the change, because of foobar2 and numerous function arguments. What about having the application set queue->free_func with a second function if needed, either directly or through av_thread_message_queue_set_free_func()? > { > #if HAVE_THREADS > AVThreadMessageQueue *rmq; > @@ -73,6 +75,7 @@ int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, > return AVERROR(ret); > } > rmq->elsize = elsize; > +rmq->free_func = free_func; > *mq = rmq; > return 0; > #else > @@ -81,10 +84,18 @@ int av_thread_message_queue_alloc(AVThreadMessageQueue > **mq, > #endif /* HAVE_THREADS */ > } > > +int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, > + unsigned nelem, > + unsigned elsize) > +{ > +return av_thread_message_queue_alloc2(mq, nelem, elsize, NULL); > +} > + > void av_thread_message_queue_free(AVThreadMessageQueue **mq) > { > #if HAVE_THREADS > if (*mq) { > +av_thread_message_flush(*mq); > av_fifo_freep(&(*mq)->fifo); > pthread_cond_destroy(&(*mq)->cond); > pthread_mutex_destroy(&(*mq)->lock); > @@ -182,3 +193,23 @@ void > av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, > pthread_mutex_unlock(>lock); > #endif /* HAVE_THREADS */ > } > + > +void av_thread_message_flush(AVThreadMessageQueue *mq) > +{ > +#if HAVE_THREADS > +int used, off; > + > +pthread_mutex_lock(>lock); > +used = av_fifo_size(mq->fifo); > +if (mq->free_func) { > +for (off = 0; off < used; off += mq->elsize) { > +void *msg; > +av_fifo_generic_peek_at(mq->fifo, , off, mq->elsize, NULL); > +mq->free_func(msg); > +} > +} > +av_fifo_drain(mq->fifo, used); > +pthread_cond_broadcast(>cond); > +pthread_mutex_unlock(>lock); > +#endif /* HAVE_THREADS */ > +} > diff --git a/libavutil/threadmessage.h b/libavutil/threadmessage.h > index a8481d8..f9004a8 100644 > --- a/libavutil/threadmessage.h > +++ b/libavutil/threadmessage.h > @@ -33,17 +33,27 @@ typedef enum AVThreadMessageFlags { > } AVThreadMessageFlags; > > /** > + * @deprecated use av_thread_message_queue_alloc2 instead > + */ > +attribute_deprecated > +int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, > + unsigned nelem, > + unsigned elsize); > + > +/** > * Allocate a new message queue. > * > * @param mq pointer to the message queue > * @param nelem maximum number of elements in the queue > * @param elsize size of each element in the queue > + * @param free_func free message callback function, can be NULL > * @return >=0 for success; <0 for error, in particular AVERROR(ENOSYS) if > * lavu was built without thread support > */ > -int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, > - unsigned nelem, > - unsigned elsize); > +int av_thread_message_queue_alloc2(AVThreadMessageQueue **mq, > + unsigned nelem, > + unsigned elsize, > + void (*free_func)(void *msg)); > > /** > * Free a message queue. > @@ -88,4 +98,9 @@ void > av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, > void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, >int err); > > +/** > + * Flush the message queue > + */ > +void av_thread_message_flush(AVThreadMessageQueue *mq); Can you explain in the doxy (and to me) how it is better than: while (av_thread_message_queue_recv_locked(mq, msg, AV_THREAD_MESSAGE_NONBLOCK)) free_func(msg); ? > + > #endif /*
Re: [FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
On Tue, Dec 01, 2015 at 05:11:06PM +0100, Nicolas George wrote: > Le decadi 10 frimaire, an CCXXIV, Clement Boesch a écrit : > > From: Clément Bœsch> > > > --- > > libavutil/threadmessage.c | 37 ++--- > > libavutil/threadmessage.h | 21 ++--- > > 2 files changed, 52 insertions(+), 6 deletions(-) > > > > diff --git a/libavutil/threadmessage.c b/libavutil/threadmessage.c > > index b7fcbe2..87ce8dc 100644 > > --- a/libavutil/threadmessage.c > > +++ b/libavutil/threadmessage.c > > @@ -40,14 +40,16 @@ struct AVThreadMessageQueue { > > int err_send; > > int err_recv; > > unsigned elsize; > > +void (*free_func)(void *msg); > > #else > > int dummy; > > #endif > > }; > > > > -int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, > > - unsigned nelem, > > - unsigned elsize) > > > +int av_thread_message_queue_alloc2(AVThreadMessageQueue **mq, > > + unsigned nelem, > > + unsigned elsize, > > + void (*free_func)(void *msg)) > > I must say I am not very fond of this aspect of the change, because of > foobar2 and numerous function arguments. > > What about having the application set queue->free_func with a second > function if needed, either directly or through > av_thread_message_queue_set_free_func()? > Yeah, sure, no opinion really. Will send a new patch. [...] > > +/** > > + * Flush the message queue > > + */ > > +void av_thread_message_flush(AVThreadMessageQueue *mq); > > Can you explain in the doxy (and to me) how it is better than: > > while (av_thread_message_queue_recv_locked(mq, msg, > AV_THREAD_MESSAGE_NONBLOCK)) > free_func(msg); > current: int used, off; pthread_mutex_lock(>lock); used = av_fifo_size(mq->fifo); if (mq->free_func) { for (off = 0; off < used; off += mq->elsize) { void *msg; av_fifo_generic_peek_at(mq->fifo, , off, mq->elsize, NULL); mq->free_func(msg); } } av_fifo_drain(mq->fifo, used); pthread_cond_broadcast(>cond); pthread_mutex_unlock(>lock); yours: void *msg; pthread_mutex_lock(>lock); while (av_thread_message_queue_recv_locked(mq, msg, AV_THREAD_MESSAGE_NONBLOCK)) if (mq->free_func) mq->free_func(msg); pthread_mutex_unlock(>lock); Your suggestion is twice smaller. I don't mind that much but I guess I like saving a few checks and preventing a bunch of signal broadcasting. Also, passing the AV_THREAD_MESSAGE_NONBLOCK seems a bit like a (smart) hack to me. I will pick your solution (which I admit haven't thought of initially) if you insist, but I prefer my version. [...] -- Clément B. signature.asc Description: PGP signature ___ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
Re: [FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
Le primidi 11 frimaire, an CCXXIV, Clement Boesch a écrit : > Your suggestion is twice smaller. I guess I was not clear enough: I was not asking about merits of this implementations over yours, but about the usefulness of the function for the application compared to reading and discarding each message in turn. I guess it is more efficient, but there are probably others, otherwise you would not have bothered. Regards, -- Nicolas George signature.asc Description: Digital signature ___ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
Re: [FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
On Tue, Dec 01, 2015 at 05:34:39PM +0100, Nicolas George wrote: > Le primidi 11 frimaire, an CCXXIV, Clement Boesch a écrit : > > Your suggestion is twice smaller. > > I guess I was not clear enough: I was not asking about merits of this > implementations over yours, but about the usefulness of the function for the > application compared to reading and discarding each message in turn. I guess > it is more efficient, but there are probably others, otherwise you would not > have bothered. > Ah. Well then the user can not do it himself since he has no way of acquiring the queue lock (AVThreadMessageQueue is opaque) -- Clément B. signature.asc Description: PGP signature ___ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
[FFmpeg-devel] [PATCH 1/2] avutil/threadmessage: add av_thread_message_flush()
From: Clément Bœsch--- libavutil/threadmessage.c | 37 ++--- libavutil/threadmessage.h | 21 ++--- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/libavutil/threadmessage.c b/libavutil/threadmessage.c index b7fcbe2..87ce8dc 100644 --- a/libavutil/threadmessage.c +++ b/libavutil/threadmessage.c @@ -40,14 +40,16 @@ struct AVThreadMessageQueue { int err_send; int err_recv; unsigned elsize; +void (*free_func)(void *msg); #else int dummy; #endif }; -int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, - unsigned nelem, - unsigned elsize) +int av_thread_message_queue_alloc2(AVThreadMessageQueue **mq, + unsigned nelem, + unsigned elsize, + void (*free_func)(void *msg)) { #if HAVE_THREADS AVThreadMessageQueue *rmq; @@ -73,6 +75,7 @@ int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, return AVERROR(ret); } rmq->elsize = elsize; +rmq->free_func = free_func; *mq = rmq; return 0; #else @@ -81,10 +84,18 @@ int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, #endif /* HAVE_THREADS */ } +int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, + unsigned nelem, + unsigned elsize) +{ +return av_thread_message_queue_alloc2(mq, nelem, elsize, NULL); +} + void av_thread_message_queue_free(AVThreadMessageQueue **mq) { #if HAVE_THREADS if (*mq) { +av_thread_message_flush(*mq); av_fifo_freep(&(*mq)->fifo); pthread_cond_destroy(&(*mq)->cond); pthread_mutex_destroy(&(*mq)->lock); @@ -182,3 +193,23 @@ void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, pthread_mutex_unlock(>lock); #endif /* HAVE_THREADS */ } + +void av_thread_message_flush(AVThreadMessageQueue *mq) +{ +#if HAVE_THREADS +int used, off; + +pthread_mutex_lock(>lock); +used = av_fifo_size(mq->fifo); +if (mq->free_func) { +for (off = 0; off < used; off += mq->elsize) { +void *msg; +av_fifo_generic_peek_at(mq->fifo, , off, mq->elsize, NULL); +mq->free_func(msg); +} +} +av_fifo_drain(mq->fifo, used); +pthread_cond_broadcast(>cond); +pthread_mutex_unlock(>lock); +#endif /* HAVE_THREADS */ +} diff --git a/libavutil/threadmessage.h b/libavutil/threadmessage.h index a8481d8..f9004a8 100644 --- a/libavutil/threadmessage.h +++ b/libavutil/threadmessage.h @@ -33,17 +33,27 @@ typedef enum AVThreadMessageFlags { } AVThreadMessageFlags; /** + * @deprecated use av_thread_message_queue_alloc2 instead + */ +attribute_deprecated +int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, + unsigned nelem, + unsigned elsize); + +/** * Allocate a new message queue. * * @param mq pointer to the message queue * @param nelem maximum number of elements in the queue * @param elsize size of each element in the queue + * @param free_func free message callback function, can be NULL * @return >=0 for success; <0 for error, in particular AVERROR(ENOSYS) if * lavu was built without thread support */ -int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, - unsigned nelem, - unsigned elsize); +int av_thread_message_queue_alloc2(AVThreadMessageQueue **mq, + unsigned nelem, + unsigned elsize, + void (*free_func)(void *msg)); /** * Free a message queue. @@ -88,4 +98,9 @@ void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err); +/** + * Flush the message queue + */ +void av_thread_message_flush(AVThreadMessageQueue *mq); + #endif /* AVUTIL_THREADMESSAGE_H */ -- 2.6.2 ___ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel