raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1593500243


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -221,41 +199,28 @@ private void emitNonJoinedOuterRecords(
             // reset to MAX_VALUE in case the store is empty
             sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-            try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> it = store.all()) {
+            try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VL, VR>> it = store.all()) {
                 TimestampedKeyAndJoinSide<K> prevKey = null;
 
-                boolean outerJoinLeftWindowOpen = false;
-                boolean outerJoinRightWindowOpen = false;
                 while (it.hasNext()) {
-                    if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
-                        // if windows are open for both joinSides we can break 
since there are no more candidates to emit
+                    final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VL, VR>> nextKeyValue = it.next();
+                    final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = nextKeyValue.key;
+                    sharedTimeTracker.minTime = 
timestampedKeyAndJoinSide.getTimestamp();
+                    if 
(isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, true) && 
isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, false)) {

Review Comment:
   Yeah, you are right! That's a good catch. I was depending too much on the 
tests. I couldn't find any test that triggered the condition even after 
reverting the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to