Hi all, I’m trying to apply a window operator over multiple streams (more than 2) and join these streams within the validity of the window. However, I have some questions about the time semantics using both the DataStream API and the Table API/SQL.
Lets say we have 3 streams, an A, B and C stream. And currently we have an A@0 (an A at timestamp 0), a B@5 and two C’s: C@6 and C@13. We would like to join these streams when they fall within a sliding window of size 10 and slide 5. Thus the first window W1[0,10) should contain A@0, B@5 and C@6. The second window W2[5,15) should contain B@5, C@5 and C@13. So only in the first window we could successfully join all 3 streams. However, I’m not able to mimic this behaviour using the DataStream or Table API. Using the DataStream API, joining multiple streams can be achieved by applying a first window and join stream A and stream B and then apply a second window to join the result of the previous window with stream C, e.g.: streamA .join(streamB) .where(<key selector>).equalTo(<key selector>) .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) //<-Window Wab .apply (new JoinFunction () {...}) .join(streamC) .where(<key selector>).equalTo(<key selector>) .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) //<-Window Wabc .apply (new JoinFunction () {…}) However, according to the documentation on Window Joins [1] (and debugging), the joined events from the first window (Wab) will be assigned a new timestamp that is the largest timestamp that still lies in the respective window, i.e. the time the window closes. Thus the result of joining A@0 and B@5 over the first window (Wab) will be AB@9. When joining with the C-stream, AB@9 can be joined with both C@5 and C@13. Which is not the behaviour I would like to obtain, since A happend at timestamp 0, and C@13 is more than 10 timestamps away. Using the Table API or SQL, I think this can be solved using Interval Joins [2]. However, it seems like the windowing semantics are different as you need to define one table(or stream) around which you want to apply a interval. Depending on the choice of table on which the interval is applied, different results can be obtained. For example, lets say we have 3 table versions of our previous streams, i.e. A, B and C, each with a time attribute ’ts’. Applying an interval around table A would result in something like: SELECT A.a, B.b, C.c FROM A, B, C WHERE A.x = B.x AND A.x = C.x AND A.ts BETWEEN B.ts - INTERVAL ‘5' MINUTE AND B.ts + INTERVAL ‘5' MINUTE AND A.ts BETWEEN C.ts - INTERVAL ‘5' MINUTE AND C.ts + INTERVAL ‘5' MINUTE So if we want a window of 10, I think we split the interval in 5 minutes before and after? However, now A@0 is not in the interval of C@6. Applying a interval of 10 would solve this problem, However if we would apply an interval of 10 both before and after, but chose to fix the interval around B instead, we run into a different problem: SELECT A.a, B.b, C.c FROM A, B, C WHERE A.x = B.x AND A.x = C.x AND B.ts BETWEEN A.ts - INTERVAL ‘10' MINUTE AND A.ts + INTERVAL ‘10' MINUTE AND B.ts BETWEEN C.ts - INTERVAL ‘10' MINUTE AND C.ts + INTERVAL ‘10’ MINUTE In this case B@5 is in the interval of A@0 but also of C@13. So my question is how can I join multiple streams within a window that would represent the behaviour as all the streams were joined in the same window? Should I write my own WindowOperator that assigns the smallest timestamp when two events can be joined instead of the time that the window closes? Thanks in advance! Kind regards, Pieter // code examples taken from [3]. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html [3] https://stackoverflow.com/a/50879029 ------------------------------------------------------------------------- Dr. Ir. Pieter Bonte Ghent University - imec IDLab iGent Tower - Department of Information Technology Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium T: +32 9 33 14938; T Secr: +32 (0)9 33 14900 F: +32 9 33 14899 E: pieter.bo...@ugent.be W: IDLab.technology W: IDLab.ugent.be