What is the best way to avoid or remove duplicates when joining a stream
with itself?  I'm performing a streaming temporal triangle computation and
the first part is to find triads of two edges of the form vertexA->vertexB
and vertexB->vertexC (and there are temporal constraints where the first
edge occurs before the second edge).  To do that, I have the following code:

DataStream<Triad> triads = edges.join(edges)
    .where(new DestKeySelector())
    .equalTo(new SourceKeySelector())
    .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
        Time.milliseconds(slideSizeMs)))
    .apply(new EdgeJoiner(queryWindow));

However, when I look at the triads being built, there are two copies
of each triad.

For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2,
targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of
"edges" with itself.

I can provide more code if that would be helpful, but I believe I've
captured the most salient portion.

Reply via email to