Interval join is exactly what I'm looking for. Thanks for pointing it out!
On Mon, Oct 8, 2018 at 9:13 AM Fabian Hueske <fhue...@gmail.com> wrote: > Did you check the new interval join that was added with Flink 1.6.0 [1]? > It might be better suited because, each record has its own boundaries > based on its timestamp and the join window interval. > > Best, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#interval-join > > Am Mo., 8. Okt. 2018 um 16:44 Uhr schrieb Eric L Goodman < > eric.good...@colorado.edu>: > >> If I change it to a Tumbling window some of the results will be lost >> since the pattern I'm matching has a temporal extent, so if the pattern >> starts in one tumbling window and ends in the next, it won't be reported. >> Based on the temporal length of the query, you can set the sliding window >> and the window lengths to capture all the patterns, though as you note, you >> will get duplicates. >> >> On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng <chenghe...@gmail.com> wrote: >> >>> Hi Eric, >>> >>> Can you change Sliding window to Tumbling window? The data of different >>> sliding window are likely overlap. >>> >>> Best, Hequn >>> >>> On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <wos...@gmail.com> >>> wrote: >>> >>>> Hey, >>>> IMHO, the simplest way in your case would be to use the Evictor to >>>> evict duplicate values after the window is generated. Have look at it here: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html >>>> >>>> Best Regards, >>>> Dominik. >>>> >>>> pon., 8 paź 2018 o 08:00 Eric L Goodman <eric.good...@colorado.edu> >>>> napisał(a): >>>> >>>>> What is the best way to avoid or remove duplicates when joining a >>>>> stream with itself? I'm performing a streaming temporal triangle >>>>> computation and the first part is to find triads of two edges of the form >>>>> vertexA->vertexB and vertexB->vertexC (and there are temporal constraints >>>>> where the first edge occurs before the second edge). To do that, I have >>>>> the following code: >>>>> >>>>> DataStream<Triad> triads = edges.join(edges) >>>>> .where(new DestKeySelector()) >>>>> .equalTo(new SourceKeySelector()) >>>>> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs), >>>>> Time.milliseconds(slideSizeMs))) >>>>> .apply(new EdgeJoiner(queryWindow)); >>>>> >>>>> However, when I look at the triads being built, there are two copies of >>>>> each triad. >>>>> >>>>> For example, if I create ten edges (time, source, target): >>>>> >>>>> 0.0, 4, 0 >>>>> >>>>> 0.01, 1, 5 >>>>> >>>>> 0.02, 3, 7 >>>>> >>>>> 0.03, 0, 8 >>>>> >>>>> 0.04, 0, 9 >>>>> >>>>> 0.05, 4, 8 >>>>> >>>>> 0.06, 4, 3 >>>>> >>>>> 0.07, 5, 9 >>>>> >>>>> 0.08, 7, 1 >>>>> >>>>> 0.09, 9, 6 >>>>> >>>>> >>>>> It creates the following triads (time1, source1, target1, time2, >>>>> source2, targe2). Note there are two copies of each. >>>>> >>>>> 0.0, 4, 0 0.03, 0, 8 >>>>> >>>>> 0.0, 4, 0 0.03, 0, 8 >>>>> >>>>> 0.0, 4, 0 0.04, 0, 9 >>>>> >>>>> 0.0, 4, 0 0.04, 0, 9 >>>>> >>>>> 0.01, 1, 5 0.07, 5, 9 >>>>> >>>>> 0.01, 1, 5 0.07, 5, 9 >>>>> >>>>> 0.02, 3, 7 0.08, 7, 1 >>>>> >>>>> 0.02, 3, 7 0.08, 7, 1 >>>>> >>>>> 0.04, 0, 9 0.09, 9, 6 >>>>> >>>>> 0.04, 0, 9 0.09, 9, 6 >>>>> >>>>> 0.07, 5, 9 0.09, 9, 6 >>>>> >>>>> 0.07, 5, 9 0.09, 9, 6 >>>>> >>>>> I'm assuming this behavior has something to do with the joining of >>>>> "edges" with itself. >>>>> >>>>> I can provide more code if that would be helpful, but I believe I've >>>>> captured the most salient portion. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>>