guozhangwang commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r622657322
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; + boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); + maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; + joinFound = true; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; + + // Emit expired records before the joined record to keep time ordering + emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp); + context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + } + + // Emit all expired records before adding a new non-joined record to the store. Otherwise, + // the put() call will advance the stream time, which causes records out of the retention + // period to be deleted, thus not being emitted later. + if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) { + emitExpiredNonJoinedOuterRecords(); } if (needOuterJoin) { - context().forward(key, joiner.apply(key, value, null)); + // The maxStreamTime contains the max time observed in both sides of the join. + // Having access to the time observed in the other join side fixes the following + // problem: + // + // Say we have a window size of 5 seconds + // 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not processed + // + // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests + // + // the condition below allows us to process the late record without the need + // to hold it in the temporary outer store + if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) { Review comment: @spena seems there are a few different conditions we can consider here: 1) record time < stream time - window length - grace length: the record is too late, we should drop it up front and also record the `droppedRecordsSensorOrExpiredWindowRecordDropSensor`. 2) record time >= stream time - window length - grace length, but < stream time: the record is still late, but joinable, since the stream time would not be advanced we would not have to check and emit non-joined records, but just try to join this record with the other window. Note that like @mjsax said, for the returned matching record, we also need to check if the other record time >= stream time - window length - grace length or not. 3) record time > stream time, we would first try to emit non-joined records, and then try to join this record. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; + boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); + maxObservedStreamTime.advance(inputRecordTimestamp); + Review comment: @spena @mjsax I left another comment below regarding the time. Please LMK id you think that makes sense. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; + boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); + maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; + joinFound = true; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; Review comment: @spena just ping to make sure you get this on the follow-up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org