spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r610891690
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + + outerJoinWindowStore.ifPresent(store -> { + // Delete the other joined key from the outer non-joined store now to prevent + // further processing + final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); + if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { + store.put(otherJoinKey, null, otherRecordTimestamp); + } + }); } if (needOuterJoin) { - context().forward(key, joiner.apply(key, value, null)); + // The maxStreamTime contains the max time observed in both sides of the join. + // Having access to the time observed in the other join side fixes the following + // problem: + // + // Say we have a window size of 5 seconds + // 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not processed + // + // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests + // + // the condition below allows us to process the late record without the need + // to hold it in the temporary outer store + if (timeTo < maxStreamTime) { + context().forward(key, joiner.apply(key, value, null)); + } else { + outerJoinWindowStore.ifPresent(store -> store.put( + KeyAndJoinSide.make(thisJoin, key), + makeValueOrOtherValue(thisJoin, value), + inputRecordTimestamp)); + } + } + + outerJoinWindowStore.ifPresent(store -> { + // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime) + // if the current record is late, then there is no need to check for expired records + if (inputRecordTimestamp == maxStreamTime) { + maybeEmitOuterExpiryRecords(store, maxStreamTime); + } + }); + } + } + + private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) { + return thisJoin + ? ValueOrOtherValue.makeValue(value) + : ValueOrOtherValue.makeOtherValue(value); + } + + @SuppressWarnings("unchecked") + private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) { + try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) { + while (it.hasNext()) { + final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next(); + + // Skip next records if the oldest record has not expired yet + if (e.key.window().end() + joinGraceMs >= maxStreamTime) { + break; + } + + final K key = e.key.key().getKey(); + + // Emit the record by joining with a null value. But the order varies depending whether + // this join is using a reverse joiner or not. Also whether the returned record from the + // outer window store is a V1 or V2 value. + if (thisJoin) { + if (e.key.key().isThisJoin()) { + context().forward(key, joiner.apply(key, (V1) e.value.getThisValue(), null)); + } else { + context().forward(key, joiner.apply(key, null, (V2) e.value.getOtherValue())); + } + } else { + if (e.key.key().isThisJoin()) { + context().forward(key, joiner.apply(key, null, (V2) e.value.getThisValue())); + } else { + context().forward(key, joiner.apply(key, (V1) e.value.getOtherValue(), null)); + } + } + + // Delete the key from tne outer window store now it is emitted Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org