gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1583698116
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -240,27 +185,33 @@ private void emitNonJoinedOuterRecords( // There might be an outer record for the other joinSide which window has not closed yet // We rely on the <timestamp><left/right-boolean><key> ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); - if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { + if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs + >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { - outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side + outerJoinLeftWindowOpen = + true; // there are no more candidates to emit on left-outerJoin-side } else { - outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side + outerJoinRightWindowOpen = + true; // there are no more candidates to emit on right-outerJoin-side } // We continue with the next outer record continue; } - + final K key = timestampedKeyAndJoinSide.getKey(); - final LeftOrRightValue<V1, V2> leftOrRightValue = next.value; - final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue); + final LeftOrRightValue<VL, VR> leftOrRightValue = next.value; + final VThis thisValue = getThisValue(leftOrRightValue); + final VOther otherValue = getOtherValue(leftOrRightValue); + final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue); context().forward( - record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) + record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) ); Review Comment: nit: This could be brought out to a method which depends only on `record` and `next`. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## Review Comment: > No more un safe type casting to be found in this class. Not that much has changed in this abstract class. I have moved some code in methods. Very nice! I really like the new types, and the abstract methods are very minimal. > Side Note: I don't have the correct formatter configured :/ I couldn't find any contribution notes to set the correct one. I'm not aware of a standard formatter either. I remember tweaking the IntelliJ default rules slightly, but taking a look at them now I'm not seeing evidence of what I changed. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java: ########## Review Comment: The left and right classes are perfect in my opinion. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -43,161 +42,106 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> { +abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements ProcessorSupplier<K, VThis, K, VOut> { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); - - private final String otherWindowName; + private final boolean outer; + private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner; private final long joinBeforeMs; private final long joinAfterMs; private final long joinGraceMs; + private final String otherWindowName; + private final TimeTrackerSupplier sharedTimeTrackerSupplier; private final boolean enableSpuriousResultFix; + private final Optional<String> outerJoinWindowName; private final long windowsBeforeMs; private final long windowsAfterMs; - private final boolean outer; - private final boolean isLeftSide; - private final Optional<String> outerJoinWindowName; - private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner; - - private final TimeTrackerSupplier sharedTimeTrackerSupplier; - - KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, - final JoinWindowsInternal windows, - final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner, - final boolean outer, - final Optional<String> outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { - this.isLeftSide = isLeftSide; + KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier sharedTimeTrackerSupplier, + final Optional<String> outerJoinWindowName, final long joinBeforeMs, + final long joinAfterMs, final JoinWindowsInternal windows, final boolean outer, + final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner) { this.otherWindowName = otherWindowName; - if (isLeftSide) { - this.joinBeforeMs = windows.beforeMs; - this.joinAfterMs = windows.afterMs; - } else { - this.joinBeforeMs = windows.afterMs; - this.joinAfterMs = windows.beforeMs; - } - this.windowsAfterMs = windows.afterMs; - this.windowsBeforeMs = windows.beforeMs; - this.joinGraceMs = windows.gracePeriodMs(); + this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; this.enableSpuriousResultFix = windows.spuriousResultFixEnabled(); - this.joiner = joiner; - this.outer = outer; this.outerJoinWindowName = outerJoinWindowName; - this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; - } - - @Override - public Processor<K, V1, K, VOut> get() { - return new KStreamKStreamJoinProcessor(); + this.joinBeforeMs = joinBeforeMs; + this.joinAfterMs = joinAfterMs; + this.joinGraceMs = windows.gracePeriodMs(); + this.windowsBeforeMs = windows.beforeMs; + this.windowsAfterMs = windows.afterMs; + this.outer = outer; + this.joiner = joiner; } - private class KStreamKStreamJoinProcessor extends ContextualProcessor<K, V1, K, VOut> { - private WindowStore<K, V2> otherWindowStore; - private Sensor droppedRecordsSensor; - private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty(); + protected abstract class KStreamKStreamJoinProcessor extends ContextualProcessor<K, VThis, K, VOut> { private InternalProcessorContext<K, VOut> internalProcessorContext; + private Sensor droppedRecordsSensor; + private TimeTracker sharedTimeTracker; + private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VL, VR>>> outerJoinStore = + Optional.empty(); + private WindowStore<K, VOther> otherWindowStore; @Override public void init(final ProcessorContext<K, VOut> context) { super.init(context); internalProcessorContext = (InternalProcessorContext<K, VOut>) context; - final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); - droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); + droppedRecordsSensor = + droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); otherWindowStore = context.getStateStore(otherWindowName); sharedTimeTracker = sharedTimeTrackerSupplier.get(context.taskId()); if (enableSpuriousResultFix) { outerJoinStore = outerJoinWindowName.map(context::getStateStore); sharedTimeTracker.setEmitInterval( - StreamsConfig.InternalConfig.getLong( - context.appConfigs(), - EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, - 1000L - ) + StreamsConfig.InternalConfig.getLong( + context.appConfigs(), + EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, + 1000L + ) ); } } - @SuppressWarnings("unchecked") @Override - public void process(final Record<K, V1> record) { - - final long inputRecordTimestamp = record.timestamp(); - final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); - final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); - - if (outer && record.key() == null && record.value() != null) { - context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); + public void process(final Record<K, VThis> thisRecord) { Review Comment: nit: Personally I don't think calling it thisRecord and thisRecordTimestamp are so necessary, because the type checker will make sure we're using the right one. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -275,24 +226,47 @@ private void emitNonJoinedOuterRecords( } } - @SuppressWarnings("unchecked") - private VOut getNullJoinedValue( - final K key, - final LeftOrRightValue<V1, V2> leftOrRightValue) { - // depending on the JoinSide fill in the joiner key and joiner values - if (isLeftSide) { - return joiner.apply(key, - leftOrRightValue.getLeftValue(), - leftOrRightValue.getRightValue()); - } else { - return joiner.apply(key, - (V1) leftOrRightValue.getRightValue(), - (V2) leftOrRightValue.getLeftValue()); + + private void performInnerJoin(final Record<K, VThis> thisRecord, final long thisRecordTimestamp) { Review Comment: nit: I think this a bit of a confusing method name, since it mentions InnerJoin, but it has needOuterJoin, and interacts with the outerJoinStore. I think this should be inlined back into process(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org