Hi all,

I’m trying to apply a window operator over multiple streams (more than 2) and 
join these streams within the validity of the window. However, I have some 
questions about the time semantics using both the DataStream API and the Table 
API/SQL.

Lets say we have 3 streams, an A, B and C stream. And currently we have an A@0 
(an A at timestamp 0), a B@5 and two C’s: C@6 and C@13.
We would like to join these streams when they fall within a sliding window of 
size 10 and slide 5.
Thus the first window W1[0,10) should contain A@0, B@5 and C@6.
The second window W2[5,15) should contain B@5, C@5 and C@13.
So only in the first window we could successfully join all 3 streams.

However, I’m not able to mimic this behaviour using the DataStream or Table API.


Using the DataStream API, joining multiple streams can be achieved by applying 
a first window and join stream A and stream B and then apply a second window to 
join the result of the previous window with stream C, e.g.:

streamA
  .join(streamB)
    .where(<key selector>).equalTo(<key selector>)
    .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) 
//<-Window Wab
    .apply (new JoinFunction () {...})
  .join(streamC)
    .where(<key selector>).equalTo(<key selector>)
    .window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) 
//<-Window Wabc
    .apply (new JoinFunction () {…})

However, according to the documentation on Window Joins [1] (and debugging), 
the joined events from the first window (Wab) will be assigned a new timestamp 
that is the largest timestamp that still lies in the respective window, i.e. 
the time the window closes. 
Thus the result of joining A@0 and B@5 over the first window (Wab) will be 
AB@9. When joining with the C-stream, AB@9 can be joined with both C@5 and 
C@13. Which is not the behaviour I would like to obtain, since A happend at 
timestamp 0, and C@13 is more than 10 timestamps away.

Using the Table API or SQL, I think this can be solved using Interval Joins 
[2]. However, it seems like the windowing semantics are different as you need 
to define one table(or stream) around which you want to apply a interval. 
Depending on the choice of table on which the interval is applied, different 
results can be obtained. For example, lets say we have 3 table versions of our 
previous streams, i.e. A, B and C, each with a time attribute ’ts’.
Applying an interval around table A would result in something like:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        A.ts BETWEEN B.ts - INTERVAL ‘5' MINUTE AND B.ts + INTERVAL ‘5' MINUTE 
AND
        A.ts BETWEEN C.ts - INTERVAL ‘5' MINUTE AND C.ts + INTERVAL ‘5' MINUTE

So if we want a window of 10, I think we split the interval in 5 minutes before 
and after? However, now A@0 is not in the interval of C@6. Applying a interval 
of 10 would solve this problem, However  if we would apply an interval of 10 
both before and after, but chose to fix the interval around B instead, we run 
into a different problem:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        B.ts BETWEEN A.ts - INTERVAL ‘10' MINUTE AND A.ts + INTERVAL ‘10' 
MINUTE AND
        B.ts BETWEEN C.ts - INTERVAL ‘10' MINUTE AND C.ts + INTERVAL ‘10’ MINUTE
In this case B@5 is in the interval of A@0 but also of C@13.

So my question is how can I join multiple streams within a window that would 
represent the behaviour as all the streams were joined in the same window? 
Should I write my own WindowOperator that assigns the smallest timestamp when 
two events can be joined instead of the time that the window closes?

Thanks in advance!

Kind regards,
Pieter

// code examples taken from [3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html
[3] https://stackoverflow.com/a/50879029
-------------------------------------------------------------------------
Dr. Ir. Pieter Bonte
Ghent University - imec 
IDLab 
iGent Tower - Department of Information Technology 
Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium 
T: +32 9 33 14938; T Secr: +32 (0)9 33 14900 
F: +32 9 33 14899 
E: pieter.bo...@ugent.be 
W: IDLab.technology 
W: IDLab.ugent.be 

Reply via email to