It's well-known that Flink does not provide any guarantees on the order in
which a CoProcessFunction (or one of its multiple variations) processes its
two inputs [1]. I wonder then what is the current best practice/recommended
approach for cases where one needs deterministic results in presence of:

1. A control stream
2. An event/data stream

Let's consider event-time semantics; both streams have timestamps, and we
want to implement "temporal join" semantics where the input events are
controlled based on the latest control signals received before them, i.e.,
the ones "active" when the events happened. For simplicity, let's assume
that all events are received in order, so that the only source of
non-determinism is the one introduced by the CoProcessFunction itself.

I'm considering the following options:

1. Buffer events inside the CoProcessFunction for some time, while saving
all the control signals in state (indexed by time)
2. Same as before but doing the pre-buffering of the event/data streams
before the CoProcessFunction
3. Similar as before but considering both streams together by multiplexing
them into one heterogeneous stream which would be pre-sorted in order to
guarantee the global ordering of the events from the two different sources.
Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a
ProcessFunction[Either[IN1, IN2], OUT] which by construction will process
the data in order and hence produce deterministic results

Essentially, all the strategies amount to introducing a "minimum amount of
delay" to guarantee the deterministic processing, which brings me to the
following question:

* How to get an estimate for the out-of-order-ness bound that a
CoProcessFunction can introduce? Is that even possible in the first place?
I guess this mostly depends on the sources of the two streams and the
relative ratio of records read. For simplicity we can consider a kafka
source for both input streams...

On a related note, the "temporal join" seems to be a well-documented and
solved problem in the SQL API [2-3], but the problem is not even mentioned
within the docs for the DataStream API. I guess I can copy/adapt the
corresponding code. E.g., for the "point-in-time" part I can consider a
TreeMap data structure as used in [3]. I have also come across a new (still
WIP) BinarySortedState [4] which would improve performance w.r.t. the
current RocksDB state backend.

References:

[1]
https://stackoverflow.com/questions/51628037/facing-race-condition-in-flink-connected-stream-in-apache-flink
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#event-time-temporal-join
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#introduction-to-watermark-strategies
[4]
https://www.slideshare.net/FlinkForward/introducing-binarysortedmultimap-a-new-flink-state-primitive-to-boost-your-application-performance

Reply via email to