mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527393681
########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ########## @@ -727,7 +801,7 @@ public void testWindowing() { } @Test - public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { + public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { Review Comment: For this PR, let it be, but for the future, please avoid unnecessary re-naming. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) { TimestampedKeyAndJoinSide<K> prevKey = null; + boolean outerJoinLeftWindowOpen = false; + boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { - boolean outerJoinLeftBreak = false; - boolean outerJoinRightBreak = false; final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next(); final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key; - final LeftOrRightValue<V1, V2> value = next.value; - final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; - // Skip next records if window has not closed + // Skip next records if window has not closed yet + // We rely on the <timestamp><left/right-boolean><key> ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { - outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side - } else { - outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side - } - if (outerJoinLeftBreak && outerJoinRightBreak) { - break; // there are no more candidates to emit on left-outerJoin-side and - // right-outerJoin-side + outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { - continue; // there are possibly candidates left on the other outerJoin-side + outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side } } - final VOut nullJoinedValue; - if (isLeftSide) { - nullJoinedValue = joiner.apply(key, - value.getLeftValue(), - value.getRightValue()); - } else { - nullJoinedValue = joiner.apply(key, - (V1) value.getRightValue(), - (V2) value.getLeftValue()); + if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { + // if windows are open for both joinSides we can break since there are no more candidates to emit + break; + } else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) { + // else if window is open only for this joinSide we continue with the next outer record + continue; } - context().forward( - record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) - ); - - if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) { - // blind-delete the previous key from the outer window store now it is emitted; - // we do this because this delete would remove the whole list of values of the same key, - // and hence if we delete eagerly and then fail, we would miss emitting join results of the later - // values in the list. - // we do not use delete() calls since it would incur extra get() - store.put(prevKey, null); + final K key = timestampedKeyAndJoinSide.getKey(); + final LeftOrRightValue<V1, V2> leftOrRightValue = next.value; + final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue); + if (nullJoinedValue != null) { Review Comment: Why this condition? The user's `ValueJoiner` can return anything, including `null` and we would still emit a result record. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) { TimestampedKeyAndJoinSide<K> prevKey = null; + boolean outerJoinLeftWindowOpen = false; + boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { - boolean outerJoinLeftBreak = false; - boolean outerJoinRightBreak = false; final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next(); final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key; - final LeftOrRightValue<V1, V2> value = next.value; - final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; - // Skip next records if window has not closed + // Skip next records if window has not closed yet + // We rely on the <timestamp><left/right-boolean><key> ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { - outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side - } else { - outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side - } - if (outerJoinLeftBreak && outerJoinRightBreak) { - break; // there are no more candidates to emit on left-outerJoin-side and - // right-outerJoin-side + outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { - continue; // there are possibly candidates left on the other outerJoin-side + outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side Review Comment: As above, I think we can `continue` right away. ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ########## @@ -511,14 +511,88 @@ public void testGracePeriod() { // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) } - inputTopic2.pipeInput(0, "dummy", 211); + inputTopic2.pipeInput(0, "dummy", 112); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(1, "null+a1", 0L), new KeyValueTimestamp<>(0, "A0+null", 0L) ); } } + @Test + public void testEmitAllNonJoinedResultsForAsymmetricWindow() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + + joined = stream1.outerJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)), + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) + ); + joined.process(supplier); + + final Collection<Set<String>> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { + final TestInputTopic<Integer, String> inputTopic1 = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic<Integer, String> inputTopic2 = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor(); + + // push one item to the primary stream; this should not produce any items because there are no joins + // and window has not ended + // w1 = {} + // w2 = {} + // --> w1 = { 0:A0 (ts: 29) } + // --> w2 = {} + inputTopic1.pipeInput(0, "A0", 29L); + processor.checkAndClearProcessResult(); + + // push another item to the primary stream; this should not produce any items because there are no joins + // and window has not ended + // w1 = { 0:A0 (ts: 29) } + // w2 = {} + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = {} + inputTopic1.pipeInput(1, "A1", 30L); + processor.checkAndClearProcessResult(); + + // push one item to the other stream; this should not produce any items because there are no joins + // and window has not ended + // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } + // w2 = {} + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = { 2:a2 (ts: 31) } + inputTopic2.pipeInput(2, "a2", 31L); + processor.checkAndClearProcessResult(); + Review Comment: Should we add another step, pushing a record with `ts=36` to ensure the window for right hand side record with `ts=31` which ends and closes at `36` is not emitted yet? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) { TimestampedKeyAndJoinSide<K> prevKey = null; + boolean outerJoinLeftWindowOpen = false; + boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { - boolean outerJoinLeftBreak = false; - boolean outerJoinRightBreak = false; final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next(); final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key; - final LeftOrRightValue<V1, V2> value = next.value; - final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; - // Skip next records if window has not closed + // Skip next records if window has not closed yet + // We rely on the <timestamp><left/right-boolean><key> ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { - outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side - } else { - outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side - } - if (outerJoinLeftBreak && outerJoinRightBreak) { - break; // there are no more candidates to emit on left-outerJoin-side and - // right-outerJoin-side + outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side Review Comment: It think we can already `continue` here right away? After we found a left record with open window, we know we don't emit it now, and can go the next record right away? This should simplify the logic further below (cf other comments)? ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ########## @@ -511,14 +511,88 @@ public void testGracePeriod() { // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) } - inputTopic2.pipeInput(0, "dummy", 211); + inputTopic2.pipeInput(0, "dummy", 112); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(1, "null+a1", 0L), new KeyValueTimestamp<>(0, "A0+null", 0L) ); } } + @Test + public void testEmitAllNonJoinedResultsForAsymmetricWindow() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + + joined = stream1.outerJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)), + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) + ); + joined.process(supplier); + + final Collection<Set<String>> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { + final TestInputTopic<Integer, String> inputTopic1 = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic<Integer, String> inputTopic2 = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor(); + + // push one item to the primary stream; this should not produce any items because there are no joins + // and window has not ended + // w1 = {} + // w2 = {} + // --> w1 = { 0:A0 (ts: 29) } + // --> w2 = {} + inputTopic1.pipeInput(0, "A0", 29L); + processor.checkAndClearProcessResult(); + + // push another item to the primary stream; this should not produce any items because there are no joins + // and window has not ended + // w1 = { 0:A0 (ts: 29) } + // w2 = {} + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = {} + inputTopic1.pipeInput(1, "A1", 30L); + processor.checkAndClearProcessResult(); + + // push one item to the other stream; this should not produce any items because there are no joins + // and window has not ended + // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } + // w2 = {} + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = { 2:a2 (ts: 31) } + inputTopic2.pipeInput(2, "a2", 31L); + processor.checkAndClearProcessResult(); + + // push another item to the other stream; this should produce no joined-items because there are no joins Review Comment: Should we split this into first pushing a record with ts=37, to only close one window and to ensure that the other open window does not prevent us from emitting and also to ensure the still open window is not close too early? nit: `no inner joined-items because there is no matching keys` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) { TimestampedKeyAndJoinSide<K> prevKey = null; + boolean outerJoinLeftWindowOpen = false; + boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { - boolean outerJoinLeftBreak = false; - boolean outerJoinRightBreak = false; final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next(); final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key; - final LeftOrRightValue<V1, V2> value = next.value; - final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; - // Skip next records if window has not closed + // Skip next records if window has not closed yet + // We rely on the <timestamp><left/right-boolean><key> ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { - outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side - } else { - outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side - } - if (outerJoinLeftBreak && outerJoinRightBreak) { - break; // there are no more candidates to emit on left-outerJoin-side and - // right-outerJoin-side + outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { - continue; // there are possibly candidates left on the other outerJoin-side + outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side } } - final VOut nullJoinedValue; - if (isLeftSide) { - nullJoinedValue = joiner.apply(key, - value.getLeftValue(), - value.getRightValue()); - } else { - nullJoinedValue = joiner.apply(key, - (V1) value.getRightValue(), - (V2) value.getLeftValue()); + if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { Review Comment: I think we should move this check further above, ie, directly as first check when we enter the `while (it.hasNext())` loop -- for this case, we only need the "than" part and we can remove the else part (as we already added `continue` above, and thus can drop `windowOpenForJoinSide` helper method? ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ########## @@ -511,14 +511,88 @@ public void testGracePeriod() { // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) } - inputTopic2.pipeInput(0, "dummy", 211); + inputTopic2.pipeInput(0, "dummy", 112); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(1, "null+a1", 0L), new KeyValueTimestamp<>(0, "A0+null", 0L) ); } } + @Test + public void testEmitAllNonJoinedResultsForAsymmetricWindow() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + + joined = stream1.outerJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)), + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) + ); + joined.process(supplier); + + final Collection<Set<String>> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { + final TestInputTopic<Integer, String> inputTopic1 = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic<Integer, String> inputTopic2 = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor(); + + // push one item to the primary stream; this should not produce any items because there are no joins + // and window has not ended + // w1 = {} + // w2 = {} + // --> w1 = { 0:A0 (ts: 29) } + // --> w2 = {} + inputTopic1.pipeInput(0, "A0", 29L); + processor.checkAndClearProcessResult(); + + // push another item to the primary stream; this should not produce any items because there are no joins + // and window has not ended + // w1 = { 0:A0 (ts: 29) } + // w2 = {} + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = {} + inputTopic1.pipeInput(1, "A1", 30L); + processor.checkAndClearProcessResult(); + + // push one item to the other stream; this should not produce any items because there are no joins + // and window has not ended + // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } + // w2 = {} + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = { 2:a2 (ts: 31) } + inputTopic2.pipeInput(2, "a2", 31L); + processor.checkAndClearProcessResult(); + + // push another item to the other stream; this should produce no joined-items because there are no joins + // and should produce a not-joined-item of the left joinSide because after window has ended Review Comment: nit: `not-joined-item of the left joinSide` -> `a left-join item` (also next line below) to simplify terminology and re-use terminology we use in other test: - inner result: `<key, (left, right)>` - left result: `<key, (left, null)>` - right result: `<key, (null, right)>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org