Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Salva Alcántara
Sounds like a plan Arvid! Taking note of it, this is gold!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Arvid Heise
I was wondering if you could actually really use AsyncWaitOperator in the
following way.

- Use a rather big timeout (so if callbacks usually take 1s, use 10).
- Use UNORDERED mode.
- Use a rather big queue size that would not cause any backpressure (you
could just experiment with different settings).

Then, you'd probably get to the operator that you would need to implement
manually anyways.
- Requests come in a specific order, that order is retained when calling
the external library.
- Results are immediately returned (depending on your watermark settings)
resulting in no additional latency (because of UNORDERED).
- The big timeouts guarantee that you will not dismiss a certain input too
quickly, if the callback takes longer than usual. It will clean up all
elements from state that have no callbacks after the given time though.
- The big queue size will avoid backpressure resulting from many pending
requests without response. Let's say you have 100 requests per second and a
timeout of 10s, that means a queue size of 1000 would allow all incoming
requests to be processed almost instantly (ignored the actual callbacks
that decrease the needed queue size as you said it to be a rather rare
event)

On Thu, Apr 9, 2020 at 11:09 AM Salva Alcántara 
wrote:

> Perfectly understood, thanks a lot for your reply/patience . I will take a
> look at AsyncWaitOperator and adapt from there if I really need that.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Salva Alcántara
Perfectly understood, thanks a lot for your reply/patience . I will take a
look at AsyncWaitOperator and adapt from there if I really need that.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Piotr Nowojski
Hi,

With:

> Can not you take into account the pending element that’s stuck somewhere in 
> the transit? Snapshot it as well and during recovery reprocess it? This is 
> exactly that’s AsyncWaitOperator is doing.

I didn’t mean for you to use AsynWaitOperator, but what both me and Arvid 
suggested you previously:

> Also as Kostas pointed out, the easiest way would be to try use 
> AsyncWaitOperator. If that’s not possible, you can implement your custom 
> logic based on its code.

You can copy/duplicate & modify/adjust the AsyncWaitOperator logic inside your 
custom operator. You don’t have to use it if you have some special 
requirements, you can implement your own custom logic. Specifically I meant to 
mimic 

org.apache.flink.streaming.api.operators.async.AsyncWaitOperator#queue

Field and how is it being used during snapshotting state & recovery.

Piotrek

> On 9 Apr 2020, at 06:10, Salva Alcántara  wrote:
> 
> I agree with your point Piotrek, AsyncIO would handle all the pending data
> for me. However, the reason why I did not want to use it is because in my
> case, the callbacks are not always called in response of new data being sent
> to the third party lib. Indeed, the callback will be called rather
> uncommonly (since in my case it will mean that an anomaly has been
> detected). This means that If I go with AsyncIO I will need to setup a max
> timeout for every element, when only a few of them will actuallyinvoke the
> callback (i.e., produce any data in response). This seems rather drastic
> because it will probably add too much latency unnecessarily, but I agree on
> that maybe there is no other way if I need exactly once guarantees.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-08 Thread Salva Alcántara
I agree with your point Piotrek, AsyncIO would handle all the pending data
for me. However, the reason why I did not want to use it is because in my
case, the callbacks are not always called in response of new data being sent
to the third party lib. Indeed, the callback will be called rather
uncommonly (since in my case it will mean that an anomaly has been
detected). This means that If I go with AsyncIO I will need to setup a max
timeout for every element, when only a few of them will actuallyinvoke the
callback (i.e., produce any data in response). This seems rather drastic
because it will probably add too much latency unnecessarily, but I agree on
that maybe there is no other way if I need exactly once guarantees.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-08 Thread Piotr Nowojski
Hi Salva,

Can not you take into account the pending element that’s stuck somewhere in the 
transit? Snapshot it as well and during recovery reprocess it? This is exactly 
that’s AsyncWaitOperator is doing.

Piotrek

> On 5 Apr 2020, at 15:00, Salva Alcántara  wrote:
> 
> Hi again Piotr,
> 
> I have further considered the mailbox executor approach and I think it will
> not be enough for my purposes. Here is why:
> 
> - My state consists of models created with a third party library
> - These models have their own state, which means that when I forward events
> in `ProcessElement1` to these models, the model's state will be updated
> accordingly.
> 
> So, what would happen if:
> 
> - A new element E is processed in `ProcessElement1` and sent to the third
> party library model
> - A checkpoint is taken, in particular snapshotting all the library models
> in use
> - The element E that was sent to the library is expected to generate an
> output O result when the callback is called, but a failure happens before
> that
> - Application recovers from the snapshot and continue processing elements,
> but the callback generating the expected output O has been lost by now, so
> that output will be lost
> 
> By considering the above case, I realize that the only option for me might
> be to rely on AsyncIO. However, this is far from ideal because I am not
> expecting an output result for each element I send to my models. I could use
> a timeout but that may slow down processing as asyncIO has a limited queue
> of "active" elements. Also, most of the times, I am not expecting a result
> back at all from my models (callbacks will be invoked only a few times since
> my modes are detecting anomalies).
> 
> In your opinion, what would be the best approach for handling this use case?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-05 Thread Salva Alcántara
Hi again Piotr,

I have further considered the mailbox executor approach and I think it will
not be enough for my purposes. Here is why:

- My state consists of models created with a third party library
- These models have their own state, which means that when I forward events
in `ProcessElement1` to these models, the model's state will be updated
accordingly.

So, what would happen if:

- A new element E is processed in `ProcessElement1` and sent to the third
party library model
- A checkpoint is taken, in particular snapshotting all the library models
in use
- The element E that was sent to the library is expected to generate an
output O result when the callback is called, but a failure happens before
that
- Application recovers from the snapshot and continue processing elements,
but the callback generating the expected output O has been lost by now, so
that output will be lost

By considering the above case, I realize that the only option for me might
be to rely on AsyncIO. However, this is far from ideal because I am not
expecting an output result for each element I send to my models. I could use
a timeout but that may slow down processing as asyncIO has a limited queue
of "active" elements. Also, most of the times, I am not expecting a result
back at all from my models (callbacks will be invoked only a few times since
my modes are detecting anomalies).

In your opinion, what would be the best approach for handling this use case?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-14 Thread Salva Alcántara
Hi Piotr,

Since my current process function already works well for me, except for the
fact I don't have access to the mailbox executor, I have simply created a
custom operator for injecting that:

```
class MyOperator(myFunction: MyFunction)
  extends KeyedCoProcessOperator(myFunction)
{

  private lazy val mailboxExecutor = getContainingTask
.getMailboxExecutorFactory
.createExecutor(getOperatorConfig.getChainIndex)

  override def open(): Unit = {
super.open()
userFunction.asInstanceOf[MyFunction].mailboxExecutor = mailboxExecutor
  }
}
```

This way I can send mails just fine...in the main application I use like
this

```
.transform("wrapping my function with my operator", new MyOperator(new
MyFunction()))
```

So far everything looks good to me, but if you see problems or know a better
way, it would be great to hear your thoughts on this again. In particular,
the way of getting access to the mailbox executor is a bit clumsy...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
Ok many thanks again!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Piotr Nowojski
You must switch to the Operator API to access the checkpointing lock. It was 
like by design - Operator API is not stable (@PublicEvolving) - that’s why we 
were able to deprecate and remove `checkpointingLock` in Flink 1.10/1.11.

Piotrek

> On 13 Feb 2020, at 14:54, Salva Alcántara  wrote:
> 
> BTW, is it possible to get the checkpoint lock within the processElement
> method of a ProcessFunction or this is not possible and I must switch to the
> Operator API instead?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
BTW, is it possible to get the checkpoint lock within the processElement
method of a ProcessFunction or this is not possible and I must switch to the
Operator API instead?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Piotr Nowojski
Glad that we could help :) Yes, Arvid’s response is spot on.

Piotrek

> On 13 Feb 2020, at 14:17, Salva Alcántara  wrote:
> 
> Many thanks for your detailed response Piotr, it helped a lot!
> 
> BTW, I got similar comments from Arvid Heise here:
> https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
Many thanks for your detailed response Piotr, it helped a lot!

BTW, I got similar comments from Arvid Heise here:
https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Piotr Nowojski
Hi,

As Kostas has pointed out, the operator's and udf’s APIs are not thread safe 
and Flink always is calling them from the same, single Task thread. This also 
includes checkpointing state. Also as Kostas pointed out, the easiest way would 
be to try use AsyncWaitOperator. If that’s not possible, you can implement your 
custom logic based on its code.

>  So, in the end,
> the `ProcessElement1` method is basically forwarding the events to this
> library and registering a callback so that, when a match is detected, the
> CoProcessFunction can emit an output event. For achieving this, the callback
> relies on a reference to the `out: Collector[T]` parameter in
> `ProcessElement1`.

In order to achieve this: to emit from a different thread:

Pre Flink 1.10

In the past (before Flink 1.10, so including Flink 1.9), multi threaded 
operators were supposed to acquire so called “checkpointingLock”. You can hold 
a reference to the output collector, but before emitting something, you have to 
acquire `checkpointingLock`. Note that if you acquire it and don’t release for 
some period of time, whole Task will be blocked from making any progress.

Flink 1.10+

in Flink 1.10 `checkpointLock` was deprecated and will be removed in Flink 
1.11. It is replaced by registering asynchronous runnable callbacks “mails", 
that can be executed by the task thread. So if you:
a) want to emit results produced by a custom thread
b) modify the operator’s state as a result of some work done by a custom thread
In both cases, both things have to be done inside the “mail” action. So for 
example pattern for a) is:

1. External thread creates record R1 to emit
2. External thread creates a “mail” to emit record R1, and it enqueues it into 
the mailbox
3. Task's thread picks up the the mail, executes it’s code, and that codes is 
emitting the record R1 from the Task thread

For both of those patterns, please take a look at the AsyncWaitOperator code in 
the respective Flink versions. Just keep in mind, that if you implement it 
using `checkpointingLock`, this will not work anymore in Flink 1.11.

Piotrek

> On 13 Feb 2020, at 10:56, Salva Alcántara  wrote:
> 
> I still need to get into the AsyncWaitOperator, but after taking a look at
> the Async I/O API, it seems that the normal use case is when you expect a
> result for each element in the input stream, so you register a callback
> together with a timeout for each input element. This is not exactly what my
> use case requires. In particular, when I send an event to the third party
> library, I might get a result...or not. The library is used for detecting
> certain patterns, so it is not as when you are querying a database, where
> you expect a result within a given time frame for each input element. In my
> case, it is more the other way around, most of the time you will not be
> expecting any outcome (think of anomaly detection). What I need is a way to
> collect the result (if any) from my third party library in my
> ProcessFunction, knowing that these outcomes will be exceptional compared
> with the cardinality of the input stream. After giving some extra thoughts,
> I don't know if the Async I/O pattern really suits my needs...
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
I still need to get into the AsyncWaitOperator, but after taking a look at
the Async I/O API, it seems that the normal use case is when you expect a
result for each element in the input stream, so you register a callback
together with a timeout for each input element. This is not exactly what my
use case requires. In particular, when I send an event to the third party
library, I might get a result...or not. The library is used for detecting
certain patterns, so it is not as when you are querying a database, where
you expect a result within a given time frame for each input element. In my
case, it is more the other way around, most of the time you will not be
expecting any outcome (think of anomaly detection). What I need is a way to
collect the result (if any) from my third party library in my
ProcessFunction, knowing that these outcomes will be exceptional compared
with the cardinality of the input stream. After giving some extra thoughts,
I don't know if the Async I/O pattern really suits my needs...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Kostas Kloudas
Hi Salva,

Yes, the same applies to the Operator API as the output is not
thread-safe and there is no way of "checkpointing" the "in-flight"
data without explicit handling.
If you want to dig deeper, I would recommend to have a look also at
the source code of the AsyncWaitOperator to see how you could bypass
these limitations with a custom operator. In fact, you may also be
able to optimise your operator for your specific usecase.

Cheers,
Kostas

On Wed, Feb 12, 2020 at 1:02 PM Salva Alcántara  wrote:
>
> Would your comment still apply if I was using AbstractStreamOperator (passing
> its output when registering the callbacks) instead of a UDF? Maybe the
> situation changes if I use the Operator API instead...
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Salva Alcántara
Would your comment still apply if I was using AbstractStreamOperator (passing
its output when registering the callbacks) instead of a UDF? Maybe the
situation changes if I use the Operator API instead...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Salva Alcántara
Hi Kostas,

Thanks for your further comments. I will take a look at the AsyncIO pattern.

Regards,

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Salva Alcántara
Hi Yun,

Thanks for your prompt and clear answer!

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Kostas Kloudas
Hi Salva and Yun,

Yun is correct on that the collector is not thread-safe so writing
should be guarded.

In addition, such a pattern that issues a request to a 3rd party
multi-threaded library and registers a callback for the future does
not play well with checkpointing. In your case, if a failure happens,
the data (or requests) that is "in-flight" are not part of any
checkpoint, thus you may have data loss. Your pattern seems more
suitable to the AsyncIO pattern [1] supported by Flink and it may make
sense to use that for you project.

I hope this helps,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

On Wed, Feb 12, 2020 at 9:03 AM Yun Gao  wrote:
>
>   Hi Salva,
>
> As far as I know,
>1. Out : Collector[T] could not support multi-thread 
> accessing, namely there could be only one thread writing records at one time. 
> If there are multiple threads using `out`, the access should need to be 
> coordinated in some way (for example, use lock, or use a queue to cache 
> record and let a single thread output them with `out`.
>2. With the current implementation, `Out` would not be 
> recreated. However, I think it is implementation related, if the processing 
> logic still happens in the `processElement` method, is it possible to always 
> use the `out` object passed into the method?
>
>
>Best,
>Yun
>
>
>
> --
> From:Salva Alcántara 
> Send Time:2020 Feb. 12 (Wed.) 13:33
> To:user 
> Subject:Using multithreaded library within ProcessFunction with callbacks 
> relying on the out parameter
>
> I am working on a `CoProcessFunction` that uses a third party library for
> detecting certain patterns of events based on some rules. So, in the end,
> the `ProcessElement1` method is basically forwarding the events to this
> library and registering a callback so that, when a match is detected, the
> CoProcessFunction can emit an output event. For achieving this, the callback
> relies on a reference to the `out: Collector[T]` parameter in
> `ProcessElement1`.
>
> Having said that, I am not sure whether this use case is well-supported by
> Flink, since:
>
> 1. There might be multiple threads spanned by the third party library (let's
> I have not any control over the amount of threads spanned, this is decided
> by the library)
> 2. I am not sure whether `out` might be recreated or something by Flink at
> some point, invalidating the references in the callbacks, making them crash
>
> So far I have not observed any issues, but I have just run my program in the
> small. It would be great to hear from the experts whether my approach is
> valid or not.
>
> PS: Also posted in
> https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Yun Gao
  Hi Salva,

As far as I know, 
   1. Out : Collector[T] could not support multi-thread accessing, 
namely there could be only one thread writing records at one time. If there are 
multiple threads using `out`, the access should need to be coordinated in some 
way (for example, use lock, or use a queue to cache record and let a single 
thread output them with `out`. 
   2. With the current implementation, `Out` would not be 
recreated. However, I think it is implementation related, if the processing 
logic still happens in the `processElement` method, is it possible to always 
use the `out` object passed into the method? 


   Best, 
   Yun




--
From:Salva Alcántara 
Send Time:2020 Feb. 12 (Wed.) 13:33
To:user 
Subject:Using multithreaded library within ProcessFunction with callbacks 
relying on the out parameter

I am working on a `CoProcessFunction` that uses a third party library for
detecting certain patterns of events based on some rules. So, in the end,
the `ProcessElement1` method is basically forwarding the events to this
library and registering a callback so that, when a match is detected, the
CoProcessFunction can emit an output event. For achieving this, the callback
relies on a reference to the `out: Collector[T]` parameter in
`ProcessElement1`.

Having said that, I am not sure whether this use case is well-supported by
Flink, since:

1. There might be multiple threads spanned by the third party library (let's
I have not any control over the amount of threads spanned, this is decided
by the library)
2. I am not sure whether `out` might be recreated or something by Flink at
some point, invalidating the references in the callbacks, making them crash

So far I have not observed any issues, but I have just run my program in the
small. It would be great to hear from the experts whether my approach is
valid or not.

PS: Also posted in
https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/