Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-11 Thread Salva Alcántara
I am working on a `CoProcessFunction` that uses a third party library for detecting certain patterns of events based on some rules. So, in the end, the `ProcessElement1` method is basically forwarding the events to this library and registering a callback so that, when a match is detected, the CoPro

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Yun Gao
Hi Salva, As far as I know, 1. Out : Collector[T] could not support multi-thread accessing, namely there could be only one thread writing records at one time. If there are multiple threads using `out`, the access should need to be coordinated in some way (for e

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Kostas Kloudas
Hi Salva and Yun, Yun is correct on that the collector is not thread-safe so writing should be guarded. In addition, such a pattern that issues a request to a 3rd party multi-threaded library and registers a callback for the future does not play well with checkpointing. In your case, if a failure

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Salva Alcántara
Hi Yun, Thanks for your prompt and clear answer! Salva -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Salva Alcántara
Hi Kostas, Thanks for your further comments. I will take a look at the AsyncIO pattern. Regards, Salva -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Salva Alcántara
Would your comment still apply if I was using AbstractStreamOperator (passing its output when registering the callbacks) instead of a UDF? Maybe the situation changes if I use the Operator API instead... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Kostas Kloudas
Hi Salva, Yes, the same applies to the Operator API as the output is not thread-safe and there is no way of "checkpointing" the "in-flight" data without explicit handling. If you want to dig deeper, I would recommend to have a look also at the source code of the AsyncWaitOperator to see how you co

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
I still need to get into the AsyncWaitOperator, but after taking a look at the Async I/O API, it seems that the normal use case is when you expect a result for each element in the input stream, so you register a callback together with a timeout for each input element. This is not exactly what my us

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Piotr Nowojski
Hi, As Kostas has pointed out, the operator's and udf’s APIs are not thread safe and Flink always is calling them from the same, single Task thread. This also includes checkpointing state. Also as Kostas pointed out, the easiest way would be to try use AsyncWaitOperator. If that’s not possible,

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
Many thanks for your detailed response Piotr, it helped a lot! BTW, I got similar comments from Arvid Heise here: https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the. -- Sent from: http://apache-flink-user-mailing-list-ar

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Piotr Nowojski
Glad that we could help :) Yes, Arvid’s response is spot on. Piotrek > On 13 Feb 2020, at 14:17, Salva Alcántara wrote: > > Many thanks for your detailed response Piotr, it helped a lot! > > BTW, I got similar comments from Arvid Heise here: > https://stackoverflow.com/questions/60181678/using

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
BTW, is it possible to get the checkpoint lock within the processElement method of a ProcessFunction or this is not possible and I must switch to the Operator API instead? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Piotr Nowojski
You must switch to the Operator API to access the checkpointing lock. It was like by design - Operator API is not stable (@PublicEvolving) - that’s why we were able to deprecate and remove `checkpointingLock` in Flink 1.10/1.11. Piotrek > On 13 Feb 2020, at 14:54, Salva Alcántara wrote: > > B

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
Ok many thanks again! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-14 Thread Salva Alcántara
Hi Piotr, Since my current process function already works well for me, except for the fact I don't have access to the mailbox executor, I have simply created a custom operator for injecting that: ``` class MyOperator(myFunction: MyFunction) extends KeyedCoProcessOperator(myFunction) { privat

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-05 Thread Salva Alcántara
Hi again Piotr, I have further considered the mailbox executor approach and I think it will not be enough for my purposes. Here is why: - My state consists of models created with a third party library - These models have their own state, which means that when I forward events in `ProcessElement1`

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-08 Thread Piotr Nowojski
Hi Salva, Can not you take into account the pending element that’s stuck somewhere in the transit? Snapshot it as well and during recovery reprocess it? This is exactly that’s AsyncWaitOperator is doing. Piotrek > On 5 Apr 2020, at 15:00, Salva Alcántara wrote: > > Hi again Piotr, > > I hav

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-08 Thread Salva Alcántara
I agree with your point Piotrek, AsyncIO would handle all the pending data for me. However, the reason why I did not want to use it is because in my case, the callbacks are not always called in response of new data being sent to the third party lib. Indeed, the callback will be called rather uncomm

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Piotr Nowojski
Hi, With: > Can not you take into account the pending element that’s stuck somewhere in > the transit? Snapshot it as well and during recovery reprocess it? This is > exactly that’s AsyncWaitOperator is doing. I didn’t mean for you to use AsynWaitOperator, but what both me and Arvid suggested

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Salva Alcántara
Perfectly understood, thanks a lot for your reply/patience . I will take a look at AsyncWaitOperator and adapt from there if I really need that. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Arvid Heise
I was wondering if you could actually really use AsyncWaitOperator in the following way. - Use a rather big timeout (so if callbacks usually take 1s, use 10). - Use UNORDERED mode. - Use a rather big queue size that would not cause any backpressure (you could just experiment with different setting

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Salva Alcántara
Sounds like a plan Arvid! Taking note of it, this is gold! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/