Hi,

As Kostas has pointed out, the operator's and udf’s APIs are not thread safe 
and Flink always is calling them from the same, single Task thread. This also 
includes checkpointing state. Also as Kostas pointed out, the easiest way would 
be to try use AsyncWaitOperator. If that’s not possible, you can implement your 
custom logic based on its code.

>  So, in the end,
> the `ProcessElement1` method is basically forwarding the events to this
> library and registering a callback so that, when a match is detected, the
> CoProcessFunction can emit an output event. For achieving this, the callback
> relies on a reference to the `out: Collector[T]` parameter in
> `ProcessElement1`.

In order to achieve this: to emit from a different thread:

Pre Flink 1.10

In the past (before Flink 1.10, so including Flink 1.9), multi threaded 
operators were supposed to acquire so called “checkpointingLock”. You can hold 
a reference to the output collector, but before emitting something, you have to 
acquire `checkpointingLock`. Note that if you acquire it and don’t release for 
some period of time, whole Task will be blocked from making any progress.

Flink 1.10+

in Flink 1.10 `checkpointLock` was deprecated and will be removed in Flink 
1.11. It is replaced by registering asynchronous runnable callbacks “mails", 
that can be executed by the task thread. So if you:
a) want to emit results produced by a custom thread
b) modify the operator’s state as a result of some work done by a custom thread
In both cases, both things have to be done inside the “mail” action. So for 
example pattern for a) is:

1. External thread creates record R1 to emit
2. External thread creates a “mail” to emit record R1, and it enqueues it into 
the mailbox
3. Task's thread picks up the the mail, executes it’s code, and that codes is 
emitting the record R1 from the Task thread

For both of those patterns, please take a look at the AsyncWaitOperator code in 
the respective Flink versions. Just keep in mind, that if you implement it 
using `checkpointingLock`, this will not work anymore in Flink 1.11.

Piotrek

> On 13 Feb 2020, at 10:56, Salva Alcántara <salcantara...@gmail.com> wrote:
> 
> I still need to get into the AsyncWaitOperator, but after taking a look at
> the Async I/O API, it seems that the normal use case is when you expect a
> result for each element in the input stream, so you register a callback
> together with a timeout for each input element. This is not exactly what my
> use case requires. In particular, when I send an event to the third party
> library, I might get a result...or not. The library is used for detecting
> certain patterns, so it is not as when you are querying a database, where
> you expect a result within a given time frame for each input element. In my
> case, it is more the other way around, most of the time you will not be
> expecting any outcome (think of anomaly detection). What I need is a way to
> collect the result (if any) from my third party library in my
> ProcessFunction, knowing that these outcomes will be exceptional compared
> with the cardinality of the input stream. After giving some extra thoughts,
> I don't know if the Async I/O pattern really suits my needs...
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to