VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1399180900
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -134,29 +129,24 @@ public void process(final Record<K, V1> record) { final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); - - // Emit all non-joined records which window has closed - if (inputRecordTimestamp == sharedTimeTracker.streamTime) { - outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record)); - } try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue<Long, V2> otherRecord = iter.next(); final long otherRecordTimestamp = otherRecord.key; - outerJoinStore.ifPresent(store -> { - // use putIfAbsent to first read and see if there's any values for the key, - // if yes delete the key, otherwise do not issue a put; - // we may delete some values with the same key early but since we are going - // range over all values of the same key even after failure, since the other window-store - // is only cleaned up by stream time, so this is okay for at-least-once. - store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), otherRecordTimestamp), null); - }); - - context().forward( - record.withValue(joiner.apply(record.key(), record.value(), otherRecord.value)) - .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + outerJoinStore.ifPresent(store -> + // Use putIfAbsent to first read and see if there's any values for the key, + // if yes delete the key, otherwise do not issue a put; + // we may delete some values with the same key early but since we are going + // range over all values of the same key even after failure, since the other + // window-store + // is only cleaned up by stream time, so this is okay for at-least-once. + store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), otherRecordTimestamp), + null)); + + context().forward(record.withValue(joiner.apply(record.key(), record.value(), otherRecord.value)) + .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); Review Comment: changes reverted ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -165,40 +155,52 @@ public void process(final Record<K, V1> record) { // problem: // // Say we have a window size of 5 seconds - // 1. A non-joined record with time T10 is seen in the left-topic (maxLeftStreamTime: 10) - // The record is not processed yet, and is added to the outer-join store - // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) - // The record is not processed yet, and is added to the outer-join store - // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) - // It is time to look at the expired records. T10 and T2 should be emitted, but - // because T2 was late, then it is not fetched by the window store, so it is not processed + // 1. A non-joined record with time T10 is seen in the left-topic + // (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic + // (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic + // (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not + // processed // // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests // - // This condition below allows us to process the out-of-order records without the need + // This condition below allows us to process the out-of-order records without + // the need // to hold it in the temporary outer store if (!outerJoinStore.isPresent() || timeTo < sharedTimeTracker.streamTime) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } else { sharedTimeTracker.updatedMinTime(inputRecordTimestamp); outerJoinStore.ifPresent(store -> store.put( - TimestampedKeyAndJoinSide.make(isLeftSide, record.key(), inputRecordTimestamp), - LeftOrRightValue.make(isLeftSide, record.value()))); + TimestampedKeyAndJoinSide.make(isLeftSide, record.key(), inputRecordTimestamp), + LeftOrRightValue.make(isLeftSide, record.value()))); Review Comment: changes reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org