This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee61bb721ee KAFKA-15417: move outerJoinBreak-flags out of the loop 
(#15510)
ee61bb721ee is described below

commit ee61bb721eecb0404929f125fe43392f3d024453
Author: Victor van den Hoven <victor.vanden.ho...@alliander.com>
AuthorDate: Tue Apr 2 15:46:54 2024 +0200

    KAFKA-15417: move outerJoinBreak-flags out of the loop (#15510)
    
    Follow up PR for https://github.com/apache/kafka/pull/14426 to fix a bug 
introduced by the previous PR.
    
    Cf https://github.com/apache/kafka/pull/14426#discussion_r1518681146
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../kstream/internals/KStreamKStreamJoin.java      |  65 ++++++------
 .../internals/KStreamKStreamOuterJoinTest.java     | 111 +++++++++++++++++++--
 2 files changed, 140 insertions(+), 36 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 124386b9bc3..b8b48ff2c4d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -125,9 +125,11 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
         @SuppressWarnings("unchecked")
         @Override
         public void process(final Record<K, V1> record) {
+
             final long inputRecordTimestamp = record.timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
+
             sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
             if (outer && record.key() == null && record.value() != null) {
@@ -193,7 +195,6 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
             }
         }
 
-        @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(
             final KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> store,
             final Record<K, V1> record) {
@@ -223,43 +224,35 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
             try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> it = store.all()) {
                 TimestampedKeyAndJoinSide<K> prevKey = null;
 
+                boolean outerJoinLeftWindowOpen = false;
+                boolean outerJoinRightWindowOpen = false;
                 while (it.hasNext()) {
-                    boolean outerJoinLeftBreak = false;
-                    boolean outerJoinRightBreak = false;
+                    if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
+                        // if windows are open for both joinSides we can break 
since there are no more candidates to emit
+                        break;
+                    }
                     final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> next = it.next();
                     final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = next.key;
-                    final LeftOrRightValue<V1, V2> value = next.value;
-                    final K key = timestampedKeyAndJoinSide.getKey();
                     final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
                     sharedTimeTracker.minTime = timestamp;
 
-                    // Skip next records if window has not closed
+                    // Continue with the next outer record if window for this 
joinSide has not closed yet
+                    // There might be an outer record for the other joinSide 
which window has not closed yet
+                    // We rely on the <timestamp><left/right-boolean><key> 
ordering of KeyValueIterator
                     final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
                     if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
                         if (timestampedKeyAndJoinSide.isLeftSide()) {
-                            outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
+                            outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
                         } else {
-                            outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
+                            outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
                         }
-                        if (outerJoinLeftBreak && outerJoinRightBreak) {
-                            break; // there are no more candidates to emit on 
left-outerJoin-side and
-                                    // right-outerJoin-side
-                        } else {
-                            continue; // there are possibly candidates left on 
the other outerJoin-side
-                        }
-                    }
-
-                    final VOut nullJoinedValue;
-                    if (isLeftSide) {
-                        nullJoinedValue = joiner.apply(key,
-                                value.getLeftValue(),
-                                value.getRightValue());
-                    } else {
-                        nullJoinedValue = joiner.apply(key,
-                                (V1) value.getRightValue(),
-                                (V2) value.getLeftValue());
+                        // We continue with the next outer record
+                        continue;
                     }
-
+                    
+                    final K key = timestampedKeyAndJoinSide.getKey();
+                    final LeftOrRightValue<V1, V2> leftOrRightValue = 
next.value;
+                    final VOut nullJoinedValue = getNullJoinedValue(key, 
leftOrRightValue);
                     context().forward(
                         
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
                     );
@@ -272,7 +265,6 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
                         // we do not use delete() calls since it would incur 
extra get()
                         store.put(prevKey, null);
                     }
-
                     prevKey = timestampedKeyAndJoinSide;
                 }
 
@@ -283,7 +275,24 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
             }
         }
 
-        private long getOuterJoinLookBackTimeMs(final 
TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
+        @SuppressWarnings("unchecked")
+        private VOut getNullJoinedValue(
+            final K key, 
+            final LeftOrRightValue<V1, V2> leftOrRightValue) {
+            // depending on the JoinSide fill in the joiner key and joiner 
values
+            if (isLeftSide) {
+                return joiner.apply(key,
+                        leftOrRightValue.getLeftValue(),
+                        leftOrRightValue.getRightValue());
+            } else {
+                return joiner.apply(key,
+                        (V1) leftOrRightValue.getRightValue(),
+                        (V2) leftOrRightValue.getLeftValue());
+            }
+        }
+
+        private long getOuterJoinLookBackTimeMs(
+            final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
             // depending on the JoinSide we fill in the outerJoinLookBackTimeMs
             if (timestampedKeyAndJoinSide.isLeftSide()) {
                 return windowsAfterMs; // On the left-JoinSide we look back in 
time
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 28a5f1488fb..279c21ef61a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -499,10 +499,10 @@ public class KStreamKStreamOuterJoinTest {
             // joined records because the window has ended, but will not 
produce non-joined records because the window has not closed.
             // w1 = { 0:A0 (ts: 0) }
             // w2 = { 1:a1 (ts: 0) }
-            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
-            // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
-            inputTopic2.pipeInput(0, "a0", 101L);
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 101) }
+            // --> w2 = { 1:a1 (ts: 0), 0:a0 (ts: 101) }
             inputTopic1.pipeInput(1, "A1", 101L);
+            inputTopic2.pipeInput(0, "a0", 101L);
             processor.checkAndClearProcessResult();
 
             // push a dummy item to the any stream after the window is closed; 
this should produced all expired non-joined records because
@@ -511,7 +511,7 @@ public class KStreamKStreamOuterJoinTest {
             // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
-            inputTopic2.pipeInput(0, "dummy", 211);
+            inputTopic2.pipeInput(0, "dummy", 112);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(1, "null+a1", 0L),
                 new KeyValueTimestamp<>(0, "A0+null", 0L)
@@ -519,6 +519,101 @@ public class KStreamKStreamOuterJoinTest {
         }
     }
 
+    @Test
+    public void testEmitAllNonJoinedResultsForAsymmetricWindow() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        joined.process(supplier);
+
+        final Collection<Set<String>> copartitionGroups =
+            
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            // push one item to the primary stream; this should not produce 
any items because there are no matching keys
+            // and window has not ended
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29) }
+            // --> w2 = {}
+            inputTopic1.pipeInput(0, "A0", 29L);
+            processor.checkAndClearProcessResult();
+
+            // push another item to the primary stream; this should not 
produce any items because there are no matching keys
+            // and window has not ended
+            // w1 = { 0:A0 (ts: 29) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = {}
+            inputTopic1.pipeInput(1, "A1", 30L);
+            processor.checkAndClearProcessResult();
+
+            // push one item to the other stream; this should not produce any 
items because there are no matching keys
+            // and window has not ended
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = { 2:a2 (ts: 31) }
+            inputTopic2.pipeInput(2, "a2", 31L);
+            processor.checkAndClearProcessResult();
+
+            // push another item to the other stream; this should produce no 
inner joined-items because there are no matching keys 
+            // and window has not ended
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+            // w2 = { 2:a2 (ts: 31) }
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36) }
+            inputTopic2.pipeInput(3, "a3", 36L);
+            processor.checkAndClearProcessResult();
+
+            // push another item to the other stream; this should produce no 
inner joined-items because there are no matching keys 
+            // and should produce a right-join-item because before window has 
ended
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+            // w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36) }
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36), 4:a4 (ts: 37) }
+            inputTopic2.pipeInput(4, "a4", 37L);
+            processor.checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(2, "null+a2", 31L)
+            );
+
+            // push another item to the other stream; this should produce no 
inner joined-items because there are no matching keys 
+            // and should produce a left-join-item because after window has 
ended
+            // and should produce two right-join-items because before window 
has ended
+            // w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // w2 = { 2:a0 (ts: 31), 3:a3 (ts: 36), 4:a4 (ts: 37) }
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36), 4:a4 (ts: 37), 5:a5 
(ts: 50) }
+            inputTopic2.pipeInput(5, "a5", 50L);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 29L),
+                new KeyValueTimestamp<>(3, "null+a3", 36L),
+                new KeyValueTimestamp<>(4, "null+a4", 37L)
+            );
+        }
+    }
+    
     @Test
     public void testOuterJoinWithInMemoryCustomSuppliers() {
         final JoinWindows joinWindows = 
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
@@ -727,7 +822,7 @@ public class KStreamKStreamOuterJoinTest {
     }
 
     @Test
-    public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+    public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
         final StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
@@ -814,7 +909,7 @@ public class KStreamKStreamOuterJoinTest {
     }
 
     @Test
-    public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
+    public void testShouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
         final StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[] {0, 1, 2, 3};
 
@@ -906,7 +1001,7 @@ public class KStreamKStreamOuterJoinTest {
      * behavior so that we can make decisions about defining it in the future.
      */
     @Test
-    public void shouldForwardCurrentHeaders() {
+    public void testShouldForwardCurrentHeaders() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Integer, String> stream1;
@@ -1331,7 +1426,7 @@ public class KStreamKStreamOuterJoinTest {
     }
 
     @Test
-    public void shouldJoinWithNonTimestampedStore() {
+    public void testShouldJoinWithNonTimestampedStore() {
         final CapturingStoreSuppliers suppliers = new 
CapturingStoreSuppliers();
         final StreamJoined<Integer, String, String> streamJoined =
                 StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())

Reply via email to