Hi Till,
Thanks for the feedback.
My use case is a little bit more tricky as I can’t key all the streams by the
same field.
Basically I’m trying to solve Continuous SPARQL queries, which consist of many
joins. I’ve seen that SPARQL queries over RDF data has been discussed before on
the mailing list, however, not for RDF streams that are only valid with a
certain time window.
To give you an example of a simple query, which looks for 'Persons that are
sitting next to Students':
Select * WHERE{
?x a Person.
?x nextTo ?y.
?y a Student.
}
So everything that matches ‘?x a Person’ could be my A Stream, ‘?x nextTo ?y’ a
B Stream and ‘?y a Student’ a C Stream.
So for the first join, '?x a Person’ and '?x nextTo ?y’ need to be joined on
variable ?x ,i.e. the first field of the A and B stream, while '?x nextTo ?y’
and '?y a Student’ need to be joined on variable ?y, i.e. the second field of
the B stream and first field of the C stream.
As I can’t key the streams before windowing, I tried to combine the streams
together, window them and assign the window end time to each event. Then I
separated the streams again and joined them using a CoProcessFunction. Based on
the window end time, I know which events should be joined as they are contained
in the same window. I thought that I could store the events of both streams and
the last seen window time end time time stamp. If and event arrives with a
larger window end time, I could join the previous seen events from both streams
and clear them.
However, I see that the events arrive out of order in my CoProcessFunction,
requiring me to store the content of various windows. I was a little bit
surprised by this behaviour, but perhaps its because I’m using a fixed dataset
for easy testing? The idea then was to store the content of multiple windows
and use the progression of the watermark to know which windows could be
cleared, so I don’t need to store the content of all possible previous windows.
However, it does not seem to be possible to combine a MapState with a list.
The map state would contain the end time of each window as key, and a list
with previously seen content of that window as value.
I’m guessing that there are more elegant and easier ways to solve this?
For the table API, I was able to find a solution as well. I first combine the
streams, window them and again assign the window end time as time stamp of each
event. I split the streams and convert them to tables. As the window end times
are assigned, I can use these to window the data using intervals, e.g. ‘A.ts
BETWEEN B.ts and B.ts’. This solution works, and it is easier to translate the
SPARQL query to SQL. However, the program does not garbage collect the content
of the streams that is out dated, as a window using the DataStream API would. I
see that my flink program keeps growing in size. Is there a translation of the
table api windows to DataStream windows?
Should I use the ‘setIdleStateRetentionTime’ configuration function, to remove
state?
Thanks in advance!
Kind regards,
Pieter
-----
Dr. Ir. Pieter Bonte
Ghent University - imec
IDLab
iGent Tower - Department of Information Technology
Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium
T: +32 9 33 14938; T Secr: +32 (0)9 33 14900
F: +32 9 33 14899
E: pieter.bo...@ugent.be
W: IDLab.technology
W: IDLab.ugent.be
> On 17 Feb 2021, at 10:00, Till Rohrmann wrote:
>
> Hi Pieter,
>
> from the top of my head, I think the easiest way to solve this problem is to
> implement your own "window join" operation by first unioning all three
> streams and then applying a ProcessWindowFunction similar to
>
> allEvents.keyBy((KeySelector) value ->
> value).window(SlidingEventTimeWindows.of(Time.seconds(10),
> Time.seconds(5))).process(
> new ProcessWindowFunction() {
> @Override
> public void process(
> Tuple tuple,
> Context context,
> Iterable elements,
> Collector out) throws Exception {
> // compute join result from elements
> }
> });
>
> @Timo is there an easier way using Flink's SQL or Table API?
>
> Cheers,
> Till
>
> On Tue, Feb 16, 2021 at 3:36 PM Pieter Bonte <mailto:pieter.bo...@ugent.be>> wrote:
> Hi all,
>
> I’m trying to apply a window operator over multiple streams (more than 2) and
> join these streams within the validity of the window. However, I have some
> questions about the time semantics using both the DataStream API and the
> Table API/SQL.
>
> Lets say we have 3 streams, an A, B and C stream. And currently we have an
> A@0 (an A at timestamp 0), a B@5 and two C’s: C@6 and C@13.
> We would like to join these streams when they fall within a sliding w