Interval join is exactly what I'm looking for.  Thanks for pointing it out!

On Mon, Oct 8, 2018 at 9:13 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Did you check the new interval join that was added with Flink 1.6.0 [1]?
> It might be better suited because, each record has its own boundaries
> based on its timestamp and the join window interval.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#interval-join
>
> Am Mo., 8. Okt. 2018 um 16:44 Uhr schrieb Eric L Goodman <
> eric.good...@colorado.edu>:
>
>> If I change it to a Tumbling window some of the results will be lost
>> since the pattern I'm matching has a temporal extent, so if the pattern
>> starts in one tumbling window and ends in the next, it won't be reported.
>> Based on the temporal length of the query, you can set the sliding window
>> and the window lengths to capture all the patterns, though as you note, you
>> will get duplicates.
>>
>> On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>>
>>> Hi Eric,
>>>
>>> Can you change Sliding window to Tumbling window? The data of different
>>> sliding window are likely overlap.
>>>
>>> Best, Hequn
>>>
>>> On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <wos...@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>> IMHO, the simplest way in your case would be to use the Evictor to
>>>> evict duplicate values after the window is generated. Have look at it here:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html
>>>>
>>>> Best Regards,
>>>> Dominik.
>>>>
>>>> pon., 8 paź 2018 o 08:00 Eric L Goodman <eric.good...@colorado.edu>
>>>> napisał(a):
>>>>
>>>>> What is the best way to avoid or remove duplicates when joining a
>>>>> stream with itself?  I'm performing a streaming temporal triangle
>>>>> computation and the first part is to find triads of two edges of the form
>>>>> vertexA->vertexB and vertexB->vertexC (and there are temporal constraints
>>>>> where the first edge occurs before the second edge).  To do that, I have
>>>>> the following code:
>>>>>
>>>>> DataStream<Triad> triads = edges.join(edges)
>>>>>     .where(new DestKeySelector())
>>>>>     .equalTo(new SourceKeySelector())
>>>>>     .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
>>>>>         Time.milliseconds(slideSizeMs)))
>>>>>     .apply(new EdgeJoiner(queryWindow));
>>>>>
>>>>> However, when I look at the triads being built, there are two copies of 
>>>>> each triad.
>>>>>
>>>>> For example, if I create ten edges (time, source, target):
>>>>>
>>>>> 0.0, 4, 0
>>>>>
>>>>> 0.01, 1, 5
>>>>>
>>>>> 0.02, 3, 7
>>>>>
>>>>> 0.03, 0, 8
>>>>>
>>>>> 0.04, 0, 9
>>>>>
>>>>> 0.05, 4, 8
>>>>>
>>>>> 0.06, 4, 3
>>>>>
>>>>> 0.07, 5, 9
>>>>>
>>>>> 0.08, 7, 1
>>>>>
>>>>> 0.09, 9, 6
>>>>>
>>>>>
>>>>> It creates the following triads (time1, source1, target1, time2,
>>>>> source2, targe2). Note there are two copies of each.
>>>>>
>>>>> 0.0, 4, 0 0.03, 0, 8
>>>>>
>>>>> 0.0, 4, 0 0.03, 0, 8
>>>>>
>>>>> 0.0, 4, 0 0.04, 0, 9
>>>>>
>>>>> 0.0, 4, 0 0.04, 0, 9
>>>>>
>>>>> 0.01, 1, 5 0.07, 5, 9
>>>>>
>>>>> 0.01, 1, 5 0.07, 5, 9
>>>>>
>>>>> 0.02, 3, 7 0.08, 7, 1
>>>>>
>>>>> 0.02, 3, 7 0.08, 7, 1
>>>>>
>>>>> 0.04, 0, 9 0.09, 9, 6
>>>>>
>>>>> 0.04, 0, 9 0.09, 9, 6
>>>>>
>>>>> 0.07, 5, 9 0.09, 9, 6
>>>>>
>>>>> 0.07, 5, 9 0.09, 9, 6
>>>>>
>>>>> I'm assuming this behavior has something to do with the joining of 
>>>>> "edges" with itself.
>>>>>
>>>>> I can provide more code if that would be helpful, but I believe I've 
>>>>> captured the most salient portion.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to