gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1591293880
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -184,20 +146,34 @@ public void process(final Record<K, V1> record) { // This condition below allows us to process the out-of-order records without the need // to hold it in the temporary outer store if (!outerJoinStore.isPresent() || timeTo < sharedTimeTracker.streamTime) { - context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); + context().forward( + record.withValue(joiner.apply(record.key(), record.value(), null))); Review Comment: nit: revert ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -43,95 +42,73 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> { +abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements ProcessorSupplier<K, VThis, K, VOut> { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); - - private final String otherWindowName; + private final boolean outer; + private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner; private final long joinBeforeMs; private final long joinAfterMs; private final long joinGraceMs; + private final String otherWindowName; + private final TimeTrackerSupplier sharedTimeTrackerSupplier; private final boolean enableSpuriousResultFix; + private final Optional<String> outerJoinWindowName; private final long windowsBeforeMs; private final long windowsAfterMs; - private final boolean outer; - private final boolean isLeftSide; - private final Optional<String> outerJoinWindowName; - private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner; - - private final TimeTrackerSupplier sharedTimeTrackerSupplier; - - KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, - final JoinWindowsInternal windows, - final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner, - final boolean outer, - final Optional<String> outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { - this.isLeftSide = isLeftSide; + KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier sharedTimeTrackerSupplier, + final Optional<String> outerJoinWindowName, final long joinBeforeMs, + final long joinAfterMs, final JoinWindowsInternal windows, final boolean outer, + final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner) { Review Comment: Revert the re-ordering here that isn't necessary, so it's easy to see the signature change for `joiner` and the removal of `isLeftSide`. We should also put each constructor argument on it's own to follow the previous style. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -43,95 +42,73 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> { +abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements ProcessorSupplier<K, VThis, K, VOut> { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); - - private final String otherWindowName; + private final boolean outer; + private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner; private final long joinBeforeMs; private final long joinAfterMs; private final long joinGraceMs; + private final String otherWindowName; + private final TimeTrackerSupplier sharedTimeTrackerSupplier; private final boolean enableSpuriousResultFix; + private final Optional<String> outerJoinWindowName; private final long windowsBeforeMs; private final long windowsAfterMs; - private final boolean outer; - private final boolean isLeftSide; - private final Optional<String> outerJoinWindowName; - private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner; - - private final TimeTrackerSupplier sharedTimeTrackerSupplier; - - KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, - final JoinWindowsInternal windows, - final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner, - final boolean outer, - final Optional<String> outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { - this.isLeftSide = isLeftSide; + KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier sharedTimeTrackerSupplier, + final Optional<String> outerJoinWindowName, final long joinBeforeMs, + final long joinAfterMs, final JoinWindowsInternal windows, final boolean outer, + final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner) { this.otherWindowName = otherWindowName; - if (isLeftSide) { - this.joinBeforeMs = windows.beforeMs; - this.joinAfterMs = windows.afterMs; - } else { - this.joinBeforeMs = windows.afterMs; - this.joinAfterMs = windows.beforeMs; - } - this.windowsAfterMs = windows.afterMs; - this.windowsBeforeMs = windows.beforeMs; - this.joinGraceMs = windows.gracePeriodMs(); + this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; this.enableSpuriousResultFix = windows.spuriousResultFixEnabled(); - this.joiner = joiner; - this.outer = outer; this.outerJoinWindowName = outerJoinWindowName; - this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; - } - - @Override - public Processor<K, V1, K, VOut> get() { - return new KStreamKStreamJoinProcessor(); + this.joinBeforeMs = joinBeforeMs; + this.joinAfterMs = joinAfterMs; + this.joinGraceMs = windows.gracePeriodMs(); + this.windowsBeforeMs = windows.beforeMs; + this.windowsAfterMs = windows.afterMs; + this.outer = outer; + this.joiner = joiner; Review Comment: nit: Reorder this to make the diff smaller. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -43,95 +42,73 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> { +abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements ProcessorSupplier<K, VThis, K, VOut> { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); - - private final String otherWindowName; + private final boolean outer; + private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner; private final long joinBeforeMs; private final long joinAfterMs; private final long joinGraceMs; + private final String otherWindowName; + private final TimeTrackerSupplier sharedTimeTrackerSupplier; private final boolean enableSpuriousResultFix; + private final Optional<String> outerJoinWindowName; private final long windowsBeforeMs; private final long windowsAfterMs; - private final boolean outer; - private final boolean isLeftSide; - private final Optional<String> outerJoinWindowName; - private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner; - - private final TimeTrackerSupplier sharedTimeTrackerSupplier; - - KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, - final JoinWindowsInternal windows, - final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner, - final boolean outer, - final Optional<String> outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { - this.isLeftSide = isLeftSide; + KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier sharedTimeTrackerSupplier, + final Optional<String> outerJoinWindowName, final long joinBeforeMs, + final long joinAfterMs, final JoinWindowsInternal windows, final boolean outer, + final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner) { this.otherWindowName = otherWindowName; - if (isLeftSide) { - this.joinBeforeMs = windows.beforeMs; - this.joinAfterMs = windows.afterMs; - } else { - this.joinBeforeMs = windows.afterMs; - this.joinAfterMs = windows.beforeMs; - } - this.windowsAfterMs = windows.afterMs; - this.windowsBeforeMs = windows.beforeMs; - this.joinGraceMs = windows.gracePeriodMs(); + this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; this.enableSpuriousResultFix = windows.spuriousResultFixEnabled(); - this.joiner = joiner; - this.outer = outer; this.outerJoinWindowName = outerJoinWindowName; - this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; - } - - @Override - public Processor<K, V1, K, VOut> get() { - return new KStreamKStreamJoinProcessor(); + this.joinBeforeMs = joinBeforeMs; + this.joinAfterMs = joinAfterMs; + this.joinGraceMs = windows.gracePeriodMs(); + this.windowsBeforeMs = windows.beforeMs; + this.windowsAfterMs = windows.afterMs; + this.outer = outer; + this.joiner = joiner; } - private class KStreamKStreamJoinProcessor extends ContextualProcessor<K, V1, K, VOut> { - private WindowStore<K, V2> otherWindowStore; - private Sensor droppedRecordsSensor; - private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty(); + protected abstract class KStreamKStreamJoinProcessor extends ContextualProcessor<K, VThis, K, VOut> { private InternalProcessorContext<K, VOut> internalProcessorContext; + private Sensor droppedRecordsSensor; + private TimeTracker sharedTimeTracker; + private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VL, VR>>> outerJoinStore = + Optional.empty(); + private WindowStore<K, VOther> otherWindowStore; @Override public void init(final ProcessorContext<K, VOut> context) { super.init(context); internalProcessorContext = (InternalProcessorContext<K, VOut>) context; - final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); - droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); + droppedRecordsSensor = + droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); otherWindowStore = context.getStateStore(otherWindowName); sharedTimeTracker = sharedTimeTrackerSupplier.get(context.taskId()); if (enableSpuriousResultFix) { outerJoinStore = outerJoinWindowName.map(context::getStateStore); sharedTimeTracker.setEmitInterval( - StreamsConfig.InternalConfig.getLong( - context.appConfigs(), - EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, - 1000L - ) + StreamsConfig.InternalConfig.getLong( + context.appConfigs(), + EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, + 1000L + ) ); Review Comment: nit: revert changes here ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -184,20 +146,34 @@ public void process(final Record<K, V1> record) { // This condition below allows us to process the out-of-order records without the need // to hold it in the temporary outer store if (!outerJoinStore.isPresent() || timeTo < sharedTimeTracker.streamTime) { - context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); + context().forward( + record.withValue(joiner.apply(record.key(), record.value(), null))); } else { - sharedTimeTracker.updatedMinTime(inputRecordTimestamp); - outerJoinStore.ifPresent(store -> store.put( - TimestampedKeyAndJoinSide.make(isLeftSide, record.key(), inputRecordTimestamp), - LeftOrRightValue.make(isLeftSide, record.value()))); + sharedTimeTracker.updatedMinTime(recordTimestamp); + putInOuterJoinStore(record, recordTimestamp); } } } } + @Override + public void close() { + sharedTimeTrackerSupplier.remove(context().taskId()); + } Review Comment: nit: move this down near the bottom for a smaller diff ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java: ########## @@ -89,8 +74,8 @@ public V2 getRightValue() { @Override public String toString() { return "<" - + ((leftValue != null) ? "left," + leftValue : "right," + rightValue) - + ">"; + + ((leftValue != null) ? "left," + leftValue : "right," + rightValue) + + ">"; Review Comment: nit: revert this indent change ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ########## @@ -43,95 +42,73 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> { +abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements ProcessorSupplier<K, VThis, K, VOut> { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); - - private final String otherWindowName; + private final boolean outer; + private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner; private final long joinBeforeMs; private final long joinAfterMs; private final long joinGraceMs; + private final String otherWindowName; + private final TimeTrackerSupplier sharedTimeTrackerSupplier; private final boolean enableSpuriousResultFix; + private final Optional<String> outerJoinWindowName; private final long windowsBeforeMs; private final long windowsAfterMs; - private final boolean outer; - private final boolean isLeftSide; - private final Optional<String> outerJoinWindowName; - private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner; - - private final TimeTrackerSupplier sharedTimeTrackerSupplier; - - KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, - final JoinWindowsInternal windows, - final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner, - final boolean outer, - final Optional<String> outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { - this.isLeftSide = isLeftSide; + KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier sharedTimeTrackerSupplier, + final Optional<String> outerJoinWindowName, final long joinBeforeMs, + final long joinAfterMs, final JoinWindowsInternal windows, final boolean outer, + final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner) { this.otherWindowName = otherWindowName; - if (isLeftSide) { - this.joinBeforeMs = windows.beforeMs; - this.joinAfterMs = windows.afterMs; - } else { - this.joinBeforeMs = windows.afterMs; - this.joinAfterMs = windows.beforeMs; - } - this.windowsAfterMs = windows.afterMs; - this.windowsBeforeMs = windows.beforeMs; - this.joinGraceMs = windows.gracePeriodMs(); + this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; this.enableSpuriousResultFix = windows.spuriousResultFixEnabled(); - this.joiner = joiner; - this.outer = outer; this.outerJoinWindowName = outerJoinWindowName; - this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; - } - - @Override - public Processor<K, V1, K, VOut> get() { - return new KStreamKStreamJoinProcessor(); + this.joinBeforeMs = joinBeforeMs; + this.joinAfterMs = joinAfterMs; + this.joinGraceMs = windows.gracePeriodMs(); + this.windowsBeforeMs = windows.beforeMs; + this.windowsAfterMs = windows.afterMs; + this.outer = outer; + this.joiner = joiner; } - private class KStreamKStreamJoinProcessor extends ContextualProcessor<K, V1, K, VOut> { - private WindowStore<K, V2> otherWindowStore; - private Sensor droppedRecordsSensor; - private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty(); + protected abstract class KStreamKStreamJoinProcessor extends ContextualProcessor<K, VThis, K, VOut> { private InternalProcessorContext<K, VOut> internalProcessorContext; + private Sensor droppedRecordsSensor; + private TimeTracker sharedTimeTracker; + private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VL, VR>>> outerJoinStore = + Optional.empty(); + private WindowStore<K, VOther> otherWindowStore; Review Comment: nit: reorder this to make the diff smaller. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java: ########## @@ -103,7 +88,7 @@ public boolean equals(final Object o) { } final LeftOrRightValue<?, ?> that = (LeftOrRightValue<?, ?>) o; return Objects.equals(leftValue, that.leftValue) && - Objects.equals(rightValue, that.rightValue); + Objects.equals(rightValue, that.rightValue); Review Comment: nit: revert this indent change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org