mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527393681


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##########
@@ -727,7 +801,7 @@ public void testWindowing() {
     }
 
     @Test
-    public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+    public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {

Review Comment:
   For this PR, let it be, but for the future, please avoid unnecessary 
re-naming.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
             try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> it = store.all()) {
                 TimestampedKeyAndJoinSide<K> prevKey = null;
 
+                boolean outerJoinLeftWindowOpen = false;
+                boolean outerJoinRightWindowOpen = false;
                 while (it.hasNext()) {
-                    boolean outerJoinLeftBreak = false;
-                    boolean outerJoinRightBreak = false;
                     final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> next = it.next();
                     final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = next.key;
-                    final LeftOrRightValue<V1, V2> value = next.value;
-                    final K key = timestampedKeyAndJoinSide.getKey();
                     final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
                     sharedTimeTracker.minTime = timestamp;
 
-                    // Skip next records if window has not closed
+                    // Skip next records if window has not closed yet
+                    // We rely on the <timestamp><left/right-boolean><key> 
ordering of KeyValueIterator
                     final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
                     if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
                         if (timestampedKeyAndJoinSide.isLeftSide()) {
-                            outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-                        } else {
-                            outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-                        }
-                        if (outerJoinLeftBreak && outerJoinRightBreak) {
-                            break; // there are no more candidates to emit on 
left-outerJoin-side and
-                                    // right-outerJoin-side
+                            outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
                         } else {
-                            continue; // there are possibly candidates left on 
the other outerJoin-side
+                            outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
                         }
                     }
 
-                    final VOut nullJoinedValue;
-                    if (isLeftSide) {
-                        nullJoinedValue = joiner.apply(key,
-                                value.getLeftValue(),
-                                value.getRightValue());
-                    } else {
-                        nullJoinedValue = joiner.apply(key,
-                                (V1) value.getRightValue(),
-                                (V2) value.getLeftValue());
+                    if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
+                        // if windows are open for both joinSides we can break 
since there are no more candidates to emit
+                        break;
+                    }  else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, 
outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) {
+                        // else if  window is open only for this joinSide we 
continue with the next outer record
+                        continue;
                     }
 
-                    context().forward(
-                        
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
-                    );
-
-                    if (prevKey != null && 
!prevKey.equals(timestampedKeyAndJoinSide)) {
-                        // blind-delete the previous key from the outer window 
store now it is emitted;
-                        // we do this because this delete would remove the 
whole list of values of the same key,
-                        // and hence if we delete eagerly and then fail, we 
would miss emitting join results of the later
-                        // values in the list.
-                        // we do not use delete() calls since it would incur 
extra get()
-                        store.put(prevKey, null);
+                    final K key = timestampedKeyAndJoinSide.getKey();
+                    final LeftOrRightValue<V1, V2> leftOrRightValue = 
next.value;
+                    final VOut nullJoinedValue = getNullJoinedValue(key, 
leftOrRightValue);
+                    if (nullJoinedValue != null) {

Review Comment:
   Why this condition? The user's `ValueJoiner` can return anything, including 
`null` and we would still emit a result record.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
             try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> it = store.all()) {
                 TimestampedKeyAndJoinSide<K> prevKey = null;
 
+                boolean outerJoinLeftWindowOpen = false;
+                boolean outerJoinRightWindowOpen = false;
                 while (it.hasNext()) {
-                    boolean outerJoinLeftBreak = false;
-                    boolean outerJoinRightBreak = false;
                     final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> next = it.next();
                     final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = next.key;
-                    final LeftOrRightValue<V1, V2> value = next.value;
-                    final K key = timestampedKeyAndJoinSide.getKey();
                     final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
                     sharedTimeTracker.minTime = timestamp;
 
-                    // Skip next records if window has not closed
+                    // Skip next records if window has not closed yet
+                    // We rely on the <timestamp><left/right-boolean><key> 
ordering of KeyValueIterator
                     final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
                     if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
                         if (timestampedKeyAndJoinSide.isLeftSide()) {
-                            outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-                        } else {
-                            outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-                        }
-                        if (outerJoinLeftBreak && outerJoinRightBreak) {
-                            break; // there are no more candidates to emit on 
left-outerJoin-side and
-                                    // right-outerJoin-side
+                            outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
                         } else {
-                            continue; // there are possibly candidates left on 
the other outerJoin-side
+                            outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side

Review Comment:
   As above, I think we can `continue` right away.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##########
@@ -511,14 +511,88 @@ public void testGracePeriod() {
             // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
-            inputTopic2.pipeInput(0, "dummy", 211);
+            inputTopic2.pipeInput(0, "dummy", 112);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(1, "null+a1", 0L),
                 new KeyValueTimestamp<>(0, "A0+null", 0L)
             );
         }
     }
 
+    @Test
+    public void testEmitAllNonJoinedResultsForAsymmetricWindow() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        joined.process(supplier);
+
+        final Collection<Set<String>> copartitionGroups =
+            
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            // push one item to the primary stream; this should not produce 
any items because there are no joins
+            // and window has not ended
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29) }
+            // --> w2 = {}
+            inputTopic1.pipeInput(0, "A0", 29L);
+            processor.checkAndClearProcessResult();
+
+            // push another item to the primary stream; this should not 
produce any items because there are no joins
+            // and window has not ended
+            // w1 = { 0:A0 (ts: 29) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = {}
+            inputTopic1.pipeInput(1, "A1", 30L);
+            processor.checkAndClearProcessResult();
+
+            // push one item to the other stream; this should not produce any 
items because there are no joins
+            // and window has not ended
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = { 2:a2 (ts: 31) }
+            inputTopic2.pipeInput(2, "a2", 31L);
+            processor.checkAndClearProcessResult();
+

Review Comment:
   Should we add another step, pushing a record with `ts=36` to ensure the 
window for right hand side record with `ts=31` which ends and closes at `36` is 
not emitted yet?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
             try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> it = store.all()) {
                 TimestampedKeyAndJoinSide<K> prevKey = null;
 
+                boolean outerJoinLeftWindowOpen = false;
+                boolean outerJoinRightWindowOpen = false;
                 while (it.hasNext()) {
-                    boolean outerJoinLeftBreak = false;
-                    boolean outerJoinRightBreak = false;
                     final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> next = it.next();
                     final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = next.key;
-                    final LeftOrRightValue<V1, V2> value = next.value;
-                    final K key = timestampedKeyAndJoinSide.getKey();
                     final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
                     sharedTimeTracker.minTime = timestamp;
 
-                    // Skip next records if window has not closed
+                    // Skip next records if window has not closed yet
+                    // We rely on the <timestamp><left/right-boolean><key> 
ordering of KeyValueIterator
                     final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
                     if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
                         if (timestampedKeyAndJoinSide.isLeftSide()) {
-                            outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-                        } else {
-                            outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-                        }
-                        if (outerJoinLeftBreak && outerJoinRightBreak) {
-                            break; // there are no more candidates to emit on 
left-outerJoin-side and
-                                    // right-outerJoin-side
+                            outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side

Review Comment:
   It think we can already `continue` here right away? After we found a left 
record with open window, we know we don't emit it now, and can go the next 
record right away?
   
   This should simplify the logic further below (cf other comments)?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##########
@@ -511,14 +511,88 @@ public void testGracePeriod() {
             // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
-            inputTopic2.pipeInput(0, "dummy", 211);
+            inputTopic2.pipeInput(0, "dummy", 112);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(1, "null+a1", 0L),
                 new KeyValueTimestamp<>(0, "A0+null", 0L)
             );
         }
     }
 
+    @Test
+    public void testEmitAllNonJoinedResultsForAsymmetricWindow() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        joined.process(supplier);
+
+        final Collection<Set<String>> copartitionGroups =
+            
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            // push one item to the primary stream; this should not produce 
any items because there are no joins
+            // and window has not ended
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29) }
+            // --> w2 = {}
+            inputTopic1.pipeInput(0, "A0", 29L);
+            processor.checkAndClearProcessResult();
+
+            // push another item to the primary stream; this should not 
produce any items because there are no joins
+            // and window has not ended
+            // w1 = { 0:A0 (ts: 29) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = {}
+            inputTopic1.pipeInput(1, "A1", 30L);
+            processor.checkAndClearProcessResult();
+
+            // push one item to the other stream; this should not produce any 
items because there are no joins
+            // and window has not ended
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = { 2:a2 (ts: 31) }
+            inputTopic2.pipeInput(2, "a2", 31L);
+            processor.checkAndClearProcessResult();
+
+            // push another item to the other stream; this should produce no 
joined-items because there are no joins 

Review Comment:
   Should we split this into first pushing a record with ts=37, to only close 
one window and to ensure that the other open window does not prevent us from 
emitting and also to ensure the still open window is not close too early?
   
   nit: `no inner joined-items because there is no matching keys`



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
             try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> it = store.all()) {
                 TimestampedKeyAndJoinSide<K> prevKey = null;
 
+                boolean outerJoinLeftWindowOpen = false;
+                boolean outerJoinRightWindowOpen = false;
                 while (it.hasNext()) {
-                    boolean outerJoinLeftBreak = false;
-                    boolean outerJoinRightBreak = false;
                     final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> next = it.next();
                     final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = next.key;
-                    final LeftOrRightValue<V1, V2> value = next.value;
-                    final K key = timestampedKeyAndJoinSide.getKey();
                     final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
                     sharedTimeTracker.minTime = timestamp;
 
-                    // Skip next records if window has not closed
+                    // Skip next records if window has not closed yet
+                    // We rely on the <timestamp><left/right-boolean><key> 
ordering of KeyValueIterator
                     final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
                     if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
                         if (timestampedKeyAndJoinSide.isLeftSide()) {
-                            outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-                        } else {
-                            outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-                        }
-                        if (outerJoinLeftBreak && outerJoinRightBreak) {
-                            break; // there are no more candidates to emit on 
left-outerJoin-side and
-                                    // right-outerJoin-side
+                            outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
                         } else {
-                            continue; // there are possibly candidates left on 
the other outerJoin-side
+                            outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
                         }
                     }
 
-                    final VOut nullJoinedValue;
-                    if (isLeftSide) {
-                        nullJoinedValue = joiner.apply(key,
-                                value.getLeftValue(),
-                                value.getRightValue());
-                    } else {
-                        nullJoinedValue = joiner.apply(key,
-                                (V1) value.getRightValue(),
-                                (V2) value.getLeftValue());
+                    if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {

Review Comment:
   I think we should move this check further above, ie, directly as first check 
when we enter the `while (it.hasNext())` loop -- for this case, we only need 
the "than" part and we can remove the else part (as we already added `continue` 
above, and thus can drop `windowOpenForJoinSide` helper method?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##########
@@ -511,14 +511,88 @@ public void testGracePeriod() {
             // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
             // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
-            inputTopic2.pipeInput(0, "dummy", 211);
+            inputTopic2.pipeInput(0, "dummy", 112);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(1, "null+a1", 0L),
                 new KeyValueTimestamp<>(0, "A0+null", 0L)
             );
         }
     }
 
+    @Test
+    public void testEmitAllNonJoinedResultsForAsymmetricWindow() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        joined.process(supplier);
+
+        final Collection<Set<String>> copartitionGroups =
+            
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            // push one item to the primary stream; this should not produce 
any items because there are no joins
+            // and window has not ended
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29) }
+            // --> w2 = {}
+            inputTopic1.pipeInput(0, "A0", 29L);
+            processor.checkAndClearProcessResult();
+
+            // push another item to the primary stream; this should not 
produce any items because there are no joins
+            // and window has not ended
+            // w1 = { 0:A0 (ts: 29) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = {}
+            inputTopic1.pipeInput(1, "A1", 30L);
+            processor.checkAndClearProcessResult();
+
+            // push one item to the other stream; this should not produce any 
items because there are no joins
+            // and window has not ended
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+            // --> w2 = { 2:a2 (ts: 31) }
+            inputTopic2.pipeInput(2, "a2", 31L);
+            processor.checkAndClearProcessResult();
+
+            // push another item to the other stream; this should produce no 
joined-items because there are no joins 
+            // and should produce a not-joined-item of the left joinSide 
because after window has ended

Review Comment:
   nit: `not-joined-item of the left joinSide` -> `a left-join item` (also next 
line below)
   
   to simplify terminology and re-use terminology we use in other test:
   
   - inner result: `<key, (left, right)>`
   - left result: `<key, (left, null)>`
   - right result: `<key, (null, right)>` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to