Did you check the new interval join that was added with Flink 1.6.0 [1]?
It might be better suited because, each record has its own boundaries based
on its timestamp and the join window interval.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#interval-join

Am Mo., 8. Okt. 2018 um 16:44 Uhr schrieb Eric L Goodman <
eric.good...@colorado.edu>:

> If I change it to a Tumbling window some of the results will be lost since
> the pattern I'm matching has a temporal extent, so if the pattern starts in
> one tumbling window and ends in the next, it won't be reported.  Based on
> the temporal length of the query, you can set the sliding window and the
> window lengths to capture all the patterns, though as you note, you will
> get duplicates.
>
> On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi Eric,
>>
>> Can you change Sliding window to Tumbling window? The data of different
>> sliding window are likely overlap.
>>
>> Best, Hequn
>>
>> On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <wos...@gmail.com> wrote:
>>
>>> Hey,
>>> IMHO, the simplest way in your case would be to use the Evictor to evict
>>> duplicate values after the window is generated. Have look at it here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html
>>>
>>> Best Regards,
>>> Dominik.
>>>
>>> pon., 8 paź 2018 o 08:00 Eric L Goodman <eric.good...@colorado.edu>
>>> napisał(a):
>>>
>>>> What is the best way to avoid or remove duplicates when joining a
>>>> stream with itself?  I'm performing a streaming temporal triangle
>>>> computation and the first part is to find triads of two edges of the form
>>>> vertexA->vertexB and vertexB->vertexC (and there are temporal constraints
>>>> where the first edge occurs before the second edge).  To do that, I have
>>>> the following code:
>>>>
>>>> DataStream<Triad> triads = edges.join(edges)
>>>>     .where(new DestKeySelector())
>>>>     .equalTo(new SourceKeySelector())
>>>>     .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
>>>>         Time.milliseconds(slideSizeMs)))
>>>>     .apply(new EdgeJoiner(queryWindow));
>>>>
>>>> However, when I look at the triads being built, there are two copies of 
>>>> each triad.
>>>>
>>>> For example, if I create ten edges (time, source, target):
>>>>
>>>> 0.0, 4, 0
>>>>
>>>> 0.01, 1, 5
>>>>
>>>> 0.02, 3, 7
>>>>
>>>> 0.03, 0, 8
>>>>
>>>> 0.04, 0, 9
>>>>
>>>> 0.05, 4, 8
>>>>
>>>> 0.06, 4, 3
>>>>
>>>> 0.07, 5, 9
>>>>
>>>> 0.08, 7, 1
>>>>
>>>> 0.09, 9, 6
>>>>
>>>>
>>>> It creates the following triads (time1, source1, target1, time2,
>>>> source2, targe2). Note there are two copies of each.
>>>>
>>>> 0.0, 4, 0 0.03, 0, 8
>>>>
>>>> 0.0, 4, 0 0.03, 0, 8
>>>>
>>>> 0.0, 4, 0 0.04, 0, 9
>>>>
>>>> 0.0, 4, 0 0.04, 0, 9
>>>>
>>>> 0.01, 1, 5 0.07, 5, 9
>>>>
>>>> 0.01, 1, 5 0.07, 5, 9
>>>>
>>>> 0.02, 3, 7 0.08, 7, 1
>>>>
>>>> 0.02, 3, 7 0.08, 7, 1
>>>>
>>>> 0.04, 0, 9 0.09, 9, 6
>>>>
>>>> 0.04, 0, 9 0.09, 9, 6
>>>>
>>>> 0.07, 5, 9 0.09, 9, 6
>>>>
>>>> 0.07, 5, 9 0.09, 9, 6
>>>>
>>>> I'm assuming this behavior has something to do with the joining of "edges" 
>>>> with itself.
>>>>
>>>> I can provide more code if that would be helpful, but I believe I've 
>>>> captured the most salient portion.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to