Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527973416 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftWindowOpen = false; +boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; final KeyValue, LeftOrRightValue> next = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; -final LeftOrRightValue value = next.value; -final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; -// Skip next records if window has not closed +// Skip next records if window has not closed yet +// We rely on the ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { -outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side -} else { -outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side -} -if (outerJoinLeftBreak && outerJoinRightBreak) { -break; // there are no more candidates to emit on left-outerJoin-side and -// right-outerJoin-side +outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { -continue; // there are possibly candidates left on the other outerJoin-side +outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side } } -final VOut nullJoinedValue; -if (isLeftSide) { -nullJoinedValue = joiner.apply(key, -value.getLeftValue(), -value.getRightValue()); -} else { -nullJoinedValue = joiner.apply(key, -(V1) value.getRightValue(), -(V2) value.getLeftValue()); +if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { +// if windows are open for both joinSides we can break since there are no more candidates to emit +break; +} else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) { +// else if window is open only for this joinSide we continue with the next outer record +continue; } -context().forward( - record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) -); - -if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) { -// blind-delete the previous key from the outer window store now it is emitted; -// we do this because this delete would remove the whole list of values of the same key, -// and hence if we delete eagerly and then fail, we would miss emitting join results of the later -// values in the list. -// we do not use delete() calls since it would incur extra get() -store.put(prevKey, null); +final K key = timestampedKeyAndJoinSide.getKey(); +final LeftOrRightValue leftOrRightValue = next.value; +final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue); +if (nullJoinedValue != null) { Review Comment: Don't know. I guess null-checks are the default in my system ;-). Removed the null check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527968309 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -727,7 +801,7 @@ public void testWindowing() { } @Test -public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { +public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { Review Comment: Ok. sorry, can revert this ofcourse. ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -727,7 +801,7 @@ public void testWindowing() { } @Test -public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { +public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { Review Comment: Ok. sorry, can revert this of course. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1528271251 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -511,14 +511,88 @@ public void testGracePeriod() { // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) } -inputTopic2.pipeInput(0, "dummy", 211); +inputTopic2.pipeInput(0, "dummy", 112); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(1, "null+a1", 0L), new KeyValueTimestamp<>(0, "A0+null", 0L) ); } } +@Test +public void testEmitAllNonJoinedResultsForAsymmetricWindow() { +final StreamsBuilder builder = new StreamsBuilder(); + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.outerJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +// push one item to the primary stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = {} +// w2 = {} +// --> w1 = { 0:A0 (ts: 29) } +// --> w2 = {} +inputTopic1.pipeInput(0, "A0", 29L); +processor.checkAndClearProcessResult(); + +// push another item to the primary stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = { 0:A0 (ts: 29) } +// w2 = {} +// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } +// --> w2 = {} +inputTopic1.pipeInput(1, "A1", 30L); +processor.checkAndClearProcessResult(); + +// push one item to the other stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } +// w2 = {} +// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } +// --> w2 = { 2:a2 (ts: 31) } +inputTopic2.pipeInput(2, "a2", 31L); +processor.checkAndClearProcessResult(); + +// push another item to the other stream; this should produce no joined-items because there are no joins Review Comment: Good idea, added a step with right hand side record on ts=37 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1528270350 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -511,14 +511,88 @@ public void testGracePeriod() { // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) } -inputTopic2.pipeInput(0, "dummy", 211); +inputTopic2.pipeInput(0, "dummy", 112); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(1, "null+a1", 0L), new KeyValueTimestamp<>(0, "A0+null", 0L) ); } } +@Test +public void testEmitAllNonJoinedResultsForAsymmetricWindow() { +final StreamsBuilder builder = new StreamsBuilder(); + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.outerJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +// push one item to the primary stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = {} +// w2 = {} +// --> w1 = { 0:A0 (ts: 29) } +// --> w2 = {} +inputTopic1.pipeInput(0, "A0", 29L); +processor.checkAndClearProcessResult(); + +// push another item to the primary stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = { 0:A0 (ts: 29) } +// w2 = {} +// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } +// --> w2 = {} +inputTopic1.pipeInput(1, "A1", 30L); +processor.checkAndClearProcessResult(); + +// push one item to the other stream; this should not produce any items because there are no joins +// and window has not ended +// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } +// w2 = {} +// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } +// --> w2 = { 2:a2 (ts: 31) } +inputTopic2.pipeInput(2, "a2", 31L); +processor.checkAndClearProcessResult(); + Review Comment: Good idea, added a step with right hand side record on ts=36 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527980016 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftWindowOpen = false; +boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; final KeyValue, LeftOrRightValue> next = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; -final LeftOrRightValue value = next.value; -final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; -// Skip next records if window has not closed +// Skip next records if window has not closed yet +// We rely on the ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { -outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side -} else { -outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side -} -if (outerJoinLeftBreak && outerJoinRightBreak) { -break; // there are no more candidates to emit on left-outerJoin-side and -// right-outerJoin-side +outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { -continue; // there are possibly candidates left on the other outerJoin-side +outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side } } -final VOut nullJoinedValue; -if (isLeftSide) { -nullJoinedValue = joiner.apply(key, -value.getLeftValue(), -value.getRightValue()); -} else { -nullJoinedValue = joiner.apply(key, -(V1) value.getRightValue(), -(V2) value.getLeftValue()); +if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { Review Comment: Yep, much simpler now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527978718 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftWindowOpen = false; +boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; final KeyValue, LeftOrRightValue> next = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; -final LeftOrRightValue value = next.value; -final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; -// Skip next records if window has not closed +// Skip next records if window has not closed yet +// We rely on the ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { -outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side -} else { -outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side -} -if (outerJoinLeftBreak && outerJoinRightBreak) { -break; // there are no more candidates to emit on left-outerJoin-side and -// right-outerJoin-side +outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side Review Comment: Yes, much simpler now. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftWindowOpen = false; +boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; final KeyValue, LeftOrRightValue> next = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; -final LeftOrRightValue value = next.value; -final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; -// Skip next records if window has not closed +// Skip next records if window has not closed yet +// We rely on the ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { -outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side -} else { -outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side -} -if (outerJoinLeftBreak && outerJoinRightBreak) { -break; // there are no more candidates to emit on left-outerJoin-side and -// right-outerJoin-side +outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { -continue; // there are possibly candidates left on the other outerJoin-side +outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side Review Comment: Indeed. -- This is an automated message from the Apache Git Service. To respond to the
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527973416 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftWindowOpen = false; +boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; final KeyValue, LeftOrRightValue> next = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; -final LeftOrRightValue value = next.value; -final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; -// Skip next records if window has not closed +// Skip next records if window has not closed yet +// We rely on the ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { -outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side -} else { -outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side -} -if (outerJoinLeftBreak && outerJoinRightBreak) { -break; // there are no more candidates to emit on left-outerJoin-side and -// right-outerJoin-side +outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { -continue; // there are possibly candidates left on the other outerJoin-side +outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side } } -final VOut nullJoinedValue; -if (isLeftSide) { -nullJoinedValue = joiner.apply(key, -value.getLeftValue(), -value.getRightValue()); -} else { -nullJoinedValue = joiner.apply(key, -(V1) value.getRightValue(), -(V2) value.getLeftValue()); +if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { +// if windows are open for both joinSides we can break since there are no more candidates to emit +break; +} else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) { +// else if window is open only for this joinSide we continue with the next outer record +continue; } -context().forward( - record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) -); - -if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) { -// blind-delete the previous key from the outer window store now it is emitted; -// we do this because this delete would remove the whole list of values of the same key, -// and hence if we delete eagerly and then fail, we would miss emitting join results of the later -// values in the list. -// we do not use delete() calls since it would incur extra get() -store.put(prevKey, null); +final K key = timestampedKeyAndJoinSide.getKey(); +final LeftOrRightValue leftOrRightValue = next.value; +final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue); +if (nullJoinedValue != null) { Review Comment: removed the null check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscr
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527968309 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -727,7 +801,7 @@ public void testWindowing() { } @Test -public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { +public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { Review Comment: Ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1527393681 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -727,7 +801,7 @@ public void testWindowing() { } @Test -public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { +public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { Review Comment: For this PR, let it be, but for the future, please avoid unnecessary re-naming. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftWindowOpen = false; +boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; final KeyValue, LeftOrRightValue> next = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; -final LeftOrRightValue value = next.value; -final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; -// Skip next records if window has not closed +// Skip next records if window has not closed yet +// We rely on the ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { -outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side -} else { -outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side -} -if (outerJoinLeftBreak && outerJoinRightBreak) { -break; // there are no more candidates to emit on left-outerJoin-side and -// right-outerJoin-side +outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { -continue; // there are possibly candidates left on the other outerJoin-side +outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side } } -final VOut nullJoinedValue; -if (isLeftSide) { -nullJoinedValue = joiner.apply(key, -value.getLeftValue(), -value.getRightValue()); -} else { -nullJoinedValue = joiner.apply(key, -(V1) value.getRightValue(), -(V2) value.getLeftValue()); +if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { +// if windows are open for both joinSides we can break since there are no more candidates to emit +break; +} else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) { +// else if window is open only for this joinSide we continue with the next outer record +continue; } -context().forward( - record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) -); - -if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) { -// blind-delete the previous key from the outer window store now it is emitted; -// we do this because this delete would remove the whole list of values of the same key, -// and hence if we delete eagerly and then fail, we would miss emitting join results of the later -// values in the list. -// we do not use delete() calls since it would incur extra get() -store.put(prevKey, null); +final K key = timestampedKeyAndJoinSide.getKey(); +
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1525144034 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Modified the emitNonJoinedOuterRecords-method for asymmetric windowing. Added a unit-test for asymmetric windows in KStreamKStreamOuterJoinTest.java. It fails if we break at the first timestamp that is too large. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1525144034 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Modified the emitNonJoinedOuterRecords-method for asymmetric windowing. Added a unit-test for asymmetric windows in a KStreamKStreamOuterJoinTest.java. It fails if we break at the first timestamp that is too large. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1523802443 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Good point about async window. Right now, the `KStreamKStreamJoinProcessor` does not know if it's a left-outer or full-outer join. However, we know inside `KStreamImplJoin` which is the only place in which we create `KStreamKStreamJoin`, so it should be easy to pass in this information into the Processor :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1522948296 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Ok, that's good. But we better not break the loop at the first timestamp which is too large. For instance when the first timestamp which is too large belongs to a left record, there still can be a timestamp for a right record that is not yet too large. This is because left and right records can have different (asymmetric, 0-sized) window sizes. So, I would only break the loop when we have found the first timestamp that is too large for one-side record and then the second timestamp that is too large for the other-side record. Unfortunately, this means that in the case of a left-join-call, there will only be found right-records in the outer join store and the loop will not break because it will try to find timestamps of left-records that are too large (that do not exist). Unless you can find out that we are processing records for a left-join-call (and not an outer-join-call), we could break the loop inmediately at the first timestamp of a record which is too large. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1522304963 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: > Will try to make a unit-test for this. Thanks! That's awesome! > The documentation says that the ordering of the KeyValueIterator is NOT guaranteed. But this is internal, right? This helper store is not plugable, so I think we could rely on ordering? -- We do store the data (ie, key) as ``, so we should be able to break the loop when we get the first timestamp which is too large (independent if it's a left or right record). cf `TimestampedKeyAndJoinSideSerializer.java` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1522304963 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: > Will try to make a unit-test for this. Thanks! That's awesome! > The documentation says that the ordering of the KeyValueIterator is NOT guaranteed. But this is internal, right? This helper store is not plugable, so I think we could rely on ordering? -- We do store the data (ie, key) as ``, so we should be able to break the loop when we get the first timestamp which is too large (independent if it's a left or right record). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1521827156 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Currently, in the while-loop of the emitNonJoinedOuterRecords() method we are iterating over ALL the left- and right-side outerJoinRecords that are available in the outerjoin-store until we meet the **break;**. The idea of the two outerJoinBreak flags was to keep track of when the window of the outerJoinRecords is not closed anymore, but this is only useful if the ordering of the KeyValueIterator is by timeStampedKey, and it is not: The documentation says that the ordering of the KeyValueIterator is NOT guaranteed. So, now I think we better can remove the outerJoinBreak flags and just check for each outerJoinRecord whether it belongs to a closed window or not, without any optimization. If the window has closed we can emit a nullJoinedValue. If the window is not closed yet we can continue with the next outerJoinRecord. What do you think? @mjsax @florin-akermann -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1521008015 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Yep, in case of a outerjoin-store with leftside and rightside records mixed together, too many null-joined records with window not-closed might be emitted, I think. So, it is even more complex than I thought Will try to make a unit-test for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1521008015 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Yep, in case of a outerjoin-store with leftside and rightside records mixed together, too many null-joined records with window not-closed might be emitted, I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1521008015 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Yep, in case of a outerjoin-store with leftside and rightside records mixed together, too many null-joined records with window not-closed could be emitted, I think. I don't think there is such a unit test, yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1520612406 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Ups... Wondering why we did not catch this in the unit tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org