Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1749965067 Thanks for the PR. I did not forget about it (sorry for the wait; very busy times...). > Moved the "emit non-joined items"-logic after the "joined items"-logic instead of before, because only then you know whether to emit or not. Can you elaborate? Not sure if I can follow? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1348235096 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: It seems your change to push out "left/outer" join result at the end of `process()` instead of at the beginning leads to this revers output, what implies we now emit data out-of-order. -- Hence, it seems better to keep the code as-is? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1348236925 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -790,9 +790,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { time += 1; inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time); -processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(1, "null+a1", 102L) -); +processor.checkAndClearProcessResult(); Review Comment: Oh dear... the comment already says "should not produce any items"... Embarrassing that we still did not catch this bug originally... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1348237510 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -878,7 +878,9 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { processor.checkAndClearProcessResult(); -// push one item to the first stream; this should produce one full-join item +// push one item to the first stream; +// this should produce one full-join item; Review Comment: I just realize that it should be `inner-join item` (not `full-join item`) -- could we fix this across the broad in this PR as side cleanup? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1750158783 > Thanks for the PR. I did not forget about it (sorry for the wait; very busy times...). > > > Moved the "emit non-joined items"-logic after the "joined items"-logic instead of before, because only then you know whether to emit or not. > > Can you elaborate? Not sure if I can follow? Sure. If not-joined record is emitted at window-close, but there is also a joined-record available at that time, then both not-joined and joined would be emitted wouldn't they? In the joined-item-logic the not-joined record in the outerJoinStore is nullified (correctly) but it has already been emitted before. Or am I mistaken? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1353117588 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items +// w1 = {} +// w2 = {} +// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } +// w2 = {} +final long time = 1000L; +for (int i = 0; i < expectedKeys.length; i++) { +inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], time + i); +} +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "B0+null", 1000L), +new KeyValueTimestamp<>(1, "B1+null", 1001L), +new KeyValueTimestamp<>(2, "B2+null", 1002L) +); +} Review Comment: Noob question: why do we have output here? The time difference is `100ms`, should we only output these three if we got an event with time `1103`? Maybe I'm missing something ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockAp
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1358134854 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items +// w1 = {} +// w2 = {} +// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } +// w2 = {} +final long time = 1000L; +for (int i = 0; i < expectedKeys.length; i++) { +inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], time + i); +} +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "B0+null", 1000L), +new KeyValueTimestamp<>(1, "B1+null", 1001L), +new KeyValueTimestamp<>(2, "B2+null", 1002L) +); +} Review Comment: No,it is a leftjoin with joinwindows 100ms before and 0ms after. So directly at timestamp of B0, B1 and B2 the not-joined record will be emitted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1374947218 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -228,7 +231,7 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (sharedTimeTracker.minTime + windowsAfterIntervalMs + joinGraceMs >= sharedTimeTracker.streamTime) { Review Comment: I'm not sure about this part. If the `value` has right value and this is on right side, why don't we check with `windows.beforeMs` to see if it expires? Not sure why `windowsAfterIntervalMs` is always set to `windows.afterMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1377564878 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -228,7 +231,7 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (sharedTimeTracker.minTime + windowsAfterIntervalMs + joinGraceMs >= sharedTimeTracker.streamTime) { Review Comment: I think you are right. It depends on the side of the outerJoin-value whether we should check with _windowsAfterMs_ or _windowsBeforeMs_. Since the outerJoin-store can contain both left-sided and right-sided outerJoin-values, we should check with _windowsAfterMs_ when outerJoin-value is left-sided and we should check _windowsBeforeMs_ when outerJoin-value is right-sided. Also, we can not break the emitNonJoinedOuterRecords-while-loop until we are completely sure that there are no more left-sided and right-sided outerJoin-values available to emit. For example, if we find out that we can skip a left-sided outerJoin-value, since the window for this value has not yet been closed, there can still be a right-sided outerJoin-value that must be emitted. In the future maybe some leftSide/rightSide-flags could be introduced that indicate whether we have put left-sided or right-sided outerJoin-values in the outerJoin-store. So that with a leftJoin() we could break the emitNonJoinedOuterRecords-while-loop earlier in order to gain time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1389220068 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: Can you point me to the ordering-rules in case of a left- outer- inner- join, then I can have a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1397715022 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -104,20 +103,16 @@ public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); -droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), +metrics); Review Comment: ditto ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: This is because previously we look at outer store and then join. But this change make it we join first and then look at outer store. The ts in outer store and other store is hard to reason. If we change the ts of 0 to be 100 and ts of 1 to be 50, the original test would still produce 0 first which has larger ts... So unless we compare the ts of join and outer at the same time when we output, we can guarantee the order of ts when output. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -60,24 +61,21 @@ class KStreamKStreamJoin implements ProcessorSupplier joiner, - final boolean outer, - final Optional outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { +KStreamKStreamJoin(final boolean isLeftSide, final String otherWindowName, final JoinWindowsInternal windows, +final ValueJoinerWithKey joiner, final boolean outer, +final Optional outerJoinWindowName, final TimeTrackerSupplier sharedTimeTrackerSupplier) { Review Comment: Change this back? I think for Kafka streams, the convention is to align with first param's indentation ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -165,40 +155,52 @@ public void process(final Record record) { // problem: // // Say we have a window size of 5 seconds -// 1. A non-joined record with time T10 is seen in the left-topic (maxLeftStreamTime: 10) -// The record is not processed yet, and is added to the outer-join store -// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) -// The record is not processed yet, and is added to the outer-join store -// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) -// It is time to look at the expired records. T10 and T2 should be emitted, but -// because T2 was late, then it is not fetched by the window store, so it is not processed +// 1. A non-joined record with time T10 is seen in the left-topic +// (maxLeftStreamTime: 10) +// The record is not processed yet, and is added to the outer-join store +// 2. A non-joined record with time T2 is seen in the right-topic +// (maxRightStreamTime: 2) +// The record is not processed yet, and is added to the outer-join store +// 3. A joined record with time T11 is seen in the left-topic +// (maxLeftStreamTime: 11) +// It is time to look at the expired records. T10 and T2 should be
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
guozhangwang commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1817710512 Also made a very quick pass, and I think the fix is spot on. It would be great to get this merged sooner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1399179893 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -60,24 +61,21 @@ class KStreamKStreamJoin implements ProcessorSupplier joiner, - final boolean outer, - final Optional outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { +KStreamKStreamJoin(final boolean isLeftSide, final String otherWindowName, final JoinWindowsInternal windows, +final ValueJoinerWithKey joiner, final boolean outer, +final Optional outerJoinWindowName, final TimeTrackerSupplier sharedTimeTrackerSupplier) { Review Comment: changes reverted ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -94,7 +92,8 @@ public Processor get() { private class KStreamKStreamJoinProcessor extends ContextualProcessor { private WindowStore otherWindowStore; private Sensor droppedRecordsSensor; -private Optional, LeftOrRightValue>> outerJoinStore = Optional.empty(); +private Optional, LeftOrRightValue>> outerJoinStore = Optional +.empty(); Review Comment: changes reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1399180276 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -104,20 +103,16 @@ public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); -droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), +metrics); Review Comment: changes reverted ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -104,20 +103,16 @@ public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); -droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), +metrics); otherWindowStore = context.getStateStore(otherWindowName); sharedTimeTracker = sharedTimeTrackerSupplier.get(context.taskId()); if (enableSpuriousResultFix) { outerJoinStore = outerJoinWindowName.map(context::getStateStore); -sharedTimeTracker.setEmitInterval( -StreamsConfig.InternalConfig.getLong( -context.appConfigs(), - EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, -1000L -) -); + sharedTimeTracker.setEmitInterval(StreamsConfig.InternalConfig.getLong(context.appConfigs(), + EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 1000L)); Review Comment: changes reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1399180900 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -134,29 +129,24 @@ public void process(final Record record) { final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); - -// Emit all non-joined records which window has closed -if (inputRecordTimestamp == sharedTimeTracker.streamTime) { -outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record)); -} try (final WindowStoreIterator iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue otherRecord = iter.next(); final long otherRecordTimestamp = otherRecord.key; -outerJoinStore.ifPresent(store -> { -// use putIfAbsent to first read and see if there's any values for the key, -// if yes delete the key, otherwise do not issue a put; -// we may delete some values with the same key early but since we are going -// range over all values of the same key even after failure, since the other window-store -// is only cleaned up by stream time, so this is okay for at-least-once. - store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), otherRecordTimestamp), null); -}); - -context().forward( -record.withValue(joiner.apply(record.key(), record.value(), otherRecord.value)) - .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); +outerJoinStore.ifPresent(store -> +// Use putIfAbsent to first read and see if there's any values for the key, +// if yes delete the key, otherwise do not issue a put; +// we may delete some values with the same key early but since we are going +// range over all values of the same key even after failure, since the other +// window-store +// is only cleaned up by stream time, so this is okay for at-least-once. + store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), otherRecordTimestamp), +null)); + + context().forward(record.withValue(joiner.apply(record.key(), record.value(), otherRecord.value)) +.withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); Review Comment: changes reverted ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -165,40 +155,52 @@ public void process(final Record record) { // problem: // // Say we have a window size of 5 seconds -// 1. A non-joined record with time T10 is seen in the left-topic (maxLeftStreamTime: 10) -// The record is not processed yet, and is added to the outer-join store -// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) -// The record is not processed yet, and is added to the outer-join store -// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) -// It is time to look at the expired records. T10 and T2 should be emitted, but -// because T2 was late, then it is not fetched by the window store, so it is not processed +// 1. A non-joined record with time T10 is seen in the left-topic +// (maxLeftStreamTime: 10) +// The record is not processed yet, and is added to the outer-join store +// 2. A non-joined record with time T2 is seen in the right-topic +// (maxRightStreamTime: 2) +// The record is not processed yet, and is added to the outer-join store +// 3. A joined record with time T11 is seen in the left-topic +// (maxLeftStreamTime: 11) +// It is time to look at the expired records. T10 and T2 should be emitted, but +// because T2 was late, then it is not fetched by the window store, so it is not +// processed // // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1399561823 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -791,7 +791,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(1, "null+a1", 102L) +new KeyValueTimestamp<>(1, "null+a1", 102L) Review Comment: revert this? ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -878,16 +880,18 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { processor.checkAndClearProcessResult(); -// push one item to the first stream; this should produce one full-join item +// push one item to the first stream; +// this should produce one inner-join item; +// and a right-joined item for a3 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) } -// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 } +// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) } time += 100; inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], time); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(2, "A2+a2", 201L) +new KeyValueTimestamp<>(2, "A2+a2", 201L) Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1397782005 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: This is because previously we look at outer store and then join. But this change make it we join first and then look at outer store. The ts in outer store and other store is hard to reason. If we change the ts of 0 to be 100 and ts of 1 to be 50, the original test would still produce 0 first which has larger ts... So unless we compare the ts of join and outer at the same time when we output, we can't guarantee the order of ts when output. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1400160757 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -791,7 +791,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(1, "null+a1", 102L) +new KeyValueTimestamp<>(1, "null+a1", 102L) Review Comment: Done ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -878,16 +880,18 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { processor.checkAndClearProcessResult(); -// push one item to the first stream; this should produce one full-join item +// push one item to the first stream; +// this should produce one inner-join item; +// and a right-joined item for a3 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) } -// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 } +// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) } time += 100; inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], time); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(2, "A2+a2", 201L) +new KeyValueTimestamp<>(2, "A2+a2", 201L) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1400165113 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: Agreed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1820473595 In javaDoc of JoinWindows: `There are three different window configuration supported: before = after = time-difference before = 0 and after = time-difference before = time-difference and after = 0 A join is symmetric in the sense, that a join specification on the first stream returns the same result record as a join specification on the second stream with flipped before and after values. Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller than lower-interval bound.` I think with this change that **non-symmetric** values for the window configuration can be supported as well now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1850672002 @mjsax , @guozhangwang , can we merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1974072824 @VictorvandenHoven -- it seems `KStreamKStreamIntegrationTest.shouldOuterJoin` fails consistently. Can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1976647985 > @VictorvandenHoven -- it seems `KStreamKStreamIntegrationTest.shouldOuterJoin` fails consistently. Can you take a look? Ouch, didn't test that one. Apparently, the internalProcessorContext.currentSystemTimeMs() behaves differently as expected. The **MOCK_TIME** advances 10 seconds and the **internalProcessorContext.currentSystemTimeMs()** only advances about 100 ms. Since the "next time to emit interval" is 1000ms by default, the "next time to emit" will never be reached. When I change the value of the "next time to emit interval" to 0, by adding the following line in de @BeforeEach. ` streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L); ` Then the test succeeds. I do not understand however, why this test worked before? Shall I change the test with the EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1977704070 Looking into the test, we create `new KafkaStreams(builder.build(streamsConfig), streamsConfig)`, but we don't pass in the mock time object. So KS creates it's own `Time` object, so it's decoupled... But yes, setting `streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L);` is a valid fix, and we do the same thing for others test -- in the end, this config is a perf optimization, but for testing we can disable it by setting it to zero. > I do not understand however, why this test worked before? Not 100% sure TBO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1511998434 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -438,13 +438,13 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records -// by the time they were produced before +// push one item to the other window that has a join; +// this should produce the joined record first; +// then the not-joined record Review Comment: This change to the comment needs to be rolled back, right? We indeed produce the left-null join result first. ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -884,11 +886,13 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { processor.checkAndClearProcessResult(); -// push one item to the first stream; this should produce one full-join item +// push one item to the first stream; +// this should produce one inner-join item; +// and a right-joined item for a3 Review Comment: We don't produce output for `a3` ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items +// w1 = {} +// w2 = {} +// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } Review Comment: Why are we using `B` not `A` for left input? ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { Review Comment: Why do we need this test case? Seems it's fully contained in `testLeftJoinedRecordsWithZeroAfterAreEmitted` below? ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.strea
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512980857 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items +// w1 = {} +// w2 = {} +// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } Review Comment: Changed B into A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512981646 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items Review Comment: modified comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512982523 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { Review Comment: Correct, removed this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512983679 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -438,13 +438,13 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records -// by the time they were produced before +// push one item to the other window that has a join; +// this should produce the joined record first; +// then the not-joined record Review Comment: Modified the comments according to the results produced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512985080 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -884,11 +886,13 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { processor.checkAndClearProcessResult(); -// push one item to the first stream; this should produce one full-join item +// push one item to the first stream; +// this should produce one inner-join item; +// and a right-joined item for a3 Review Comment: Removed the line in the comment that says we produce output for a3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax merged PR #14426: URL: https://github.com/apache/kafka/pull/14426 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1979898854 Thanks for the fix! Merged to `trunk`. Really appreciate that you did push this through. Was more complicated than expected and took way to long to get finished. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
florin-akermann commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1518681146 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { -break; +final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); +if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (timestampedKeyAndJoinSide.isLeftSide()) { +outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side +} else { +outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side +} +if (outerJoinLeftBreak && outerJoinRightBreak) { Review Comment: Hi, Seems like `outerJoinLeftBreak && outerJoinRightBreak` is always false. Doesn't this break the behaviro described in the comment on top of this block? `// Skip next records if window has not closed`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
florin-akermann commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1518681146 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { -break; +final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); +if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (timestampedKeyAndJoinSide.isLeftSide()) { +outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side +} else { +outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side +} +if (outerJoinLeftBreak && outerJoinRightBreak) { Review Comment: Hi, Seems like `outerJoinLeftBreak && outerJoinRightBreak` is always false. Doesn't this break the behavior described in the comment on top of this block? `// Skip next records if window has not closed`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1518866473 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { -break; +final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); +if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (timestampedKeyAndJoinSide.isLeftSide()) { +outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side +} else { +outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side +} +if (outerJoinLeftBreak && outerJoinRightBreak) { Review Comment: Probably these two lines need to be outside the while loop: ``` boolean outerJoinLeftBreak = false; boolean outerJoinRightBreak = false; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1520201655 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { -break; +final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); +if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (timestampedKeyAndJoinSide.isLeftSide()) { +outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side +} else { +outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side +} +if (outerJoinLeftBreak && outerJoinRightBreak) { Review Comment: See: https://github.com/apache/kafka/pull/15510 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1880585360 > @mjsax , @guozhangwang , can we merge this? Since it has been a couple of months, I suppose it will not be merged then? Can we discuss this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1900331266 > @mjsax , @guozhangwang , can we merge this? How long does it normally take to get a reaction? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1474063977 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: What we have seen is that you could emit both a null-joined record and a joined-record if you first check the outerjoinstore (via emitNonJoinedOuterRecords()) and only after that, a record from the otherWindowStore. This behaviour is not what you want. Therefore you should first find out if there is going to be a joined record and if there is, you have to nullify this in the relevant outerjoinstore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1474170865 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items +// w1 = {} +// w2 = {} +// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } +// w2 = {} +final long time = 1000L; +for (int i = 0; i < expectedKeys.length; i++) { +inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], time + i); +} +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "B0+null", 1000L), +new KeyValueTimestamp<>(1, "B1+null", 1001L), +new KeyValueTimestamp<>(2, "B2+null", 1002L) +); +} +} + +@Test +public void testLeftJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; +final int[] expectedKeysNotJoined = new int[] {10, 11, 12, 13}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(Duration.ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); + +// push four items with increasing timestamps to the primary stream; the other window is empty; +// this should emit the first three left-joined items; +// A3 is not triggered yet +
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1483677544 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items +// w1 = {} +// w2 = {} +// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } +// w2 = {} +final long time = 1000L; +for (int i = 0; i < expectedKeys.length; i++) { +inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], time + i); +} +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "B0+null", 1000L), +new KeyValueTimestamp<>(1, "B1+null", 1001L), +new KeyValueTimestamp<>(2, "B2+null", 1002L) +); +} +} + +@Test +public void testLeftJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; +final int[] expectedKeysNotJoined = new int[] {10, 11, 12, 13}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(Duration.ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); + +// push four items with increasing timestamps to the primary stream; the other window is empty; +// this should emit the first three left-joined items; +// A3 is not triggered yet +// w
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1483962804 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items +// w1 = {} +// w2 = {} +// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } +// w2 = {} +final long time = 1000L; +for (int i = 0; i < expectedKeys.length; i++) { +inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], time + i); +} +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "B0+null", 1000L), +new KeyValueTimestamp<>(1, "B1+null", 1001L), +new KeyValueTimestamp<>(2, "B2+null", 1002L) +); +} +} + +@Test +public void testLeftJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; +final int[] expectedKeysNotJoined = new int[] {10, 11, 12, 13}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(Duration.ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); + +// push four items with increasing timestamps to the primary stream; the other window is empty; +// this should emit the first three left-joined items; +// A3 is not triggered yet +
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven closed pull request #14426: KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items URL: https://github.com/apache/kafka/pull/14426 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1935587072 Accidently closed the PR, reopening again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1935911914 Merged the code of [KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123) into this PR. Everything else left as it was. All the tests still passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1936706379 > Merged the code of [KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123) into this PR. Why? We are mixing up two ticket if we do this (cf https://github.com/apache/kafka/pull/14426#discussion_r1483677544) Can you remove those changes? Fixing the grace period should be kept separate to get different commits for different fixes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
florin-akermann commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1936963868 > > All the tests still passed. What currently has been merged from https://issues.apache.org/jira/browse/KAFKA-16123 into this PR wouldn't solve the general case (non null-key records). I agree with Matthias, I would keep it separate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
florin-akermann commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1937311592 I now pushed a 'generalized' fix for [KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1938209556 So I reverted the code of [KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1491788610 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: I agree that we should not produce a null-joined record and joined-record. Can you elaborate on the case when (and why) this could exactly happen? It sounds like a bug in `emitNonJoinedOuterRecords()` to me. When we check the `outerStore` we should only emit null-joined-records that are "expired", ie, which belong to already closed windows, and thus, the order in which we check should not matter. It sounds like as if we might incorrectly emit null-joined records of window that are not closed yet (would be a bug) or for windows which are already closed but we emit an incorrect join-record. Given that we identified that we indeed have a bug with regard to "late records" and that we don't respect the grace-period correctly, I would assume it's the same root cause. Thus, I am now wondering if we should merge this PR as-is (and fix the order back in a follow PR @florin-akermann is doing), or revert this change in this PR right away, or first try to merge Florin's PR and re-evaluate this change afterwards? Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1491788610 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: I agree that we should not produce a null-joined record and joined-record. Can you elaborate on the case when (and why) this could exactly happen? It sounds like a bug in `emitNonJoinedOuterRecords()` to me. When we check the `outerStore` we should only emit null-joined-records that are "expired", ie, which belong to already closed windows, and thus, the order in which we check should not matter. It sounds like as if we might incorrectly emit null-joined records of window that are not closed yet (would be a bug) or for windows which are already closed but we emit an incorrect join-record. Given that we identified that we indeed have a bug with regard to "late records" and that we don't respect the grace-period correctly, I would assume it's the same root cause. Thus, I am now wondering if we (1) should merge this PR as-is (and fix the order back in a follow PR @florin-akermann is doing), or (2) revert this change in this PR right away, or (3) first try to merge Florin's PR and re-evaluate this change afterwards? Option (1) seems to be the least desirable to me. I am happy with both (2) or (3). Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1492090658 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: When we change the order in which we check back to as it was, only the ordering tests will fail. So there seems to be no bug (anymore). I think that in a previous version of this PR (before checking both the left and right in the outerjoin) we saw that it did matter and a left-join test failed. But this has been solved now with the getOuterJoinLookBackTimeMs(). I think option 2 and option 3 will do fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: Seems this PR is basically ready for merging, so it might be faster to go with option (2), and revert changing the order in this PR and we can merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: Seems this PR is basically ready for merging, to it might be faster to go with option (2), and revert changing the order in this PR and we can merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: Seems this PR is basically ready for merging, so it might be faster to go with option (2), and revert changing the order in this PR and we can merge it. -- Of course, I would want to make a final pass after the change, to check the testing code again to verify correctness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org