spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610754025



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer 
non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+                            store.put(otherJoinKey, null, 
otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to 
check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final 
WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long 
maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, 
ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, 
ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired 
yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the 
order varies depending whether
+                    // this join is using a reverse joiner or not. Also 
whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, (V1) 
e.value.getThisValue(), null));
+                        } else {
+                            context().forward(key, joiner.apply(key, null, 
(V2) e.value.getOtherValue()));
+                        }
+                    } else {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, null, 
(V2) e.value.getThisValue()));
+                        } else {
+                            context().forward(key, joiner.apply(key, (V1) 
e.value.getOtherValue(), null));
+                        }
+                    }
+
+                    // Delete the key from tne outer window store now it is 
emitted
+                    store.put(e.key.key(), null, e.key.window().start());

Review comment:
       I'm not sure if this is possible. If I put or delete a future record 
that the iterator hasn't seen, then the iterator will reflect that change if I 
continue iterating until the deleted or new record appears. This is how Rocksdb 
iterator works, isn't it? When asking the next record, it only reads one record 
from the store. If subsequent records are deleted or are new, then the iterator 
will read the update in the next next() call.
   
   Or what test case do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to