It's well-known that Flink does not provide any guarantees on the order in which a CoProcessFunction (or one of its multiple variations) processes its two inputs [1]. I wonder then what is the current best practice/recommended approach for cases where one needs deterministic results in presence of:
1. A control stream 2. An event/data stream Let's consider event-time semantics; both streams have timestamps, and we want to implement "temporal join" semantics where the input events are controlled based on the latest control signals received before them, i.e., the ones "active" when the events happened. For simplicity, let's assume that all events are received in order, so that the only source of non-determinism is the one introduced by the CoProcessFunction itself. I'm considering the following options: 1. Buffer events inside the CoProcessFunction for some time, while saving all the control signals in state (indexed by time) 2. Same as before but doing the pre-buffering of the event/data streams before the CoProcessFunction 3. Similar as before but considering both streams together by multiplexing them into one heterogeneous stream which would be pre-sorted in order to guarantee the global ordering of the events from the two different sources. Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a ProcessFunction[Either[IN1, IN2], OUT] which by construction will process the data in order and hence produce deterministic results Essentially, all the strategies amount to introducing a "minimum amount of delay" to guarantee the deterministic processing, which brings me to the following question: * How to get an estimate for the out-of-order-ness bound that a CoProcessFunction can introduce? Is that even possible in the first place? I guess this mostly depends on the sources of the two streams and the relative ratio of records read. For simplicity we can consider a kafka source for both input streams... On a related note, the "temporal join" seems to be a well-documented and solved problem in the SQL API [2-3], but the problem is not even mentioned within the docs for the DataStream API. I guess I can copy/adapt the corresponding code. E.g., for the "point-in-time" part I can consider a TreeMap data structure as used in [3]. I have also come across a new (still WIP) BinarySortedState [4] which would improve performance w.r.t. the current RocksDB state backend. References: [1] https://stackoverflow.com/questions/51628037/facing-race-condition-in-flink-connected-stream-in-apache-flink [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#event-time-temporal-join [3] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#introduction-to-watermark-strategies [4] https://www.slideshare.net/FlinkForward/introducing-binarysortedmultimap-a-new-flink-state-primitive-to-boost-your-application-performance