gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1591293880


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -184,20 +146,34 @@ public void process(final Record<K, V1> record) {
                     // This condition below allows us to process the 
out-of-order records without the need
                     // to hold it in the temporary outer store
                     if (!outerJoinStore.isPresent() || timeTo < 
sharedTimeTracker.streamTime) {
-                        
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
+                        context().forward(
+                                record.withValue(joiner.apply(record.key(), 
record.value(), null)));

Review Comment:
   nit: revert



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -43,95 +42,73 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, 
K, VOut> {
+abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements 
ProcessorSupplier<K, VThis, K, VOut> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-    private final String otherWindowName;
+    private final boolean outer;
+    private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, 
? extends VOut> joiner;
     private final long joinBeforeMs;
     private final long joinAfterMs;
     private final long joinGraceMs;
+    private final String otherWindowName;
+    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
     private final boolean enableSpuriousResultFix;
+    private final Optional<String> outerJoinWindowName;
     private final long windowsBeforeMs;
     private final long windowsAfterMs;
 
-    private final boolean outer;
-    private final boolean isLeftSide;
-    private final Optional<String> outerJoinWindowName;
-    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joiner;
-
-    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-    KStreamKStreamJoin(final boolean isLeftSide,
-                       final String otherWindowName,
-                       final JoinWindowsInternal windows,
-                       final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, ? extends VOut> joiner,
-                       final boolean outer,
-                       final Optional<String> outerJoinWindowName,
-                       final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-        this.isLeftSide = isLeftSide;
+    KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier 
sharedTimeTrackerSupplier,
+                       final Optional<String> outerJoinWindowName, final long 
joinBeforeMs,
+                       final long joinAfterMs, final JoinWindowsInternal 
windows, final boolean outer,
+                       final ValueJoinerWithKey<? super K, ? super VThis, ? 
super VOther, ? extends VOut> joiner) {

Review Comment:
   Revert the re-ordering here that isn't necessary, so it's easy to see the 
signature change for `joiner` and the removal of `isLeftSide`.
   
   We should also put each constructor argument on it's own to follow the 
previous style.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -43,95 +42,73 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, 
K, VOut> {
+abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements 
ProcessorSupplier<K, VThis, K, VOut> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-    private final String otherWindowName;
+    private final boolean outer;
+    private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, 
? extends VOut> joiner;
     private final long joinBeforeMs;
     private final long joinAfterMs;
     private final long joinGraceMs;
+    private final String otherWindowName;
+    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
     private final boolean enableSpuriousResultFix;
+    private final Optional<String> outerJoinWindowName;
     private final long windowsBeforeMs;
     private final long windowsAfterMs;
 
-    private final boolean outer;
-    private final boolean isLeftSide;
-    private final Optional<String> outerJoinWindowName;
-    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joiner;
-
-    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-    KStreamKStreamJoin(final boolean isLeftSide,
-                       final String otherWindowName,
-                       final JoinWindowsInternal windows,
-                       final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, ? extends VOut> joiner,
-                       final boolean outer,
-                       final Optional<String> outerJoinWindowName,
-                       final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-        this.isLeftSide = isLeftSide;
+    KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier 
sharedTimeTrackerSupplier,
+                       final Optional<String> outerJoinWindowName, final long 
joinBeforeMs,
+                       final long joinAfterMs, final JoinWindowsInternal 
windows, final boolean outer,
+                       final ValueJoinerWithKey<? super K, ? super VThis, ? 
super VOther, ? extends VOut> joiner) {
         this.otherWindowName = otherWindowName;
-        if (isLeftSide) {
-            this.joinBeforeMs = windows.beforeMs;
-            this.joinAfterMs = windows.afterMs;
-        } else {
-            this.joinBeforeMs = windows.afterMs;
-            this.joinAfterMs = windows.beforeMs;
-        }
-        this.windowsAfterMs = windows.afterMs;
-        this.windowsBeforeMs = windows.beforeMs;
-        this.joinGraceMs = windows.gracePeriodMs();
+        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
         this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
-        this.joiner = joiner;
-        this.outer = outer;
         this.outerJoinWindowName = outerJoinWindowName;
-        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
-    }
-
-    @Override
-    public Processor<K, V1, K, VOut> get() {
-        return new KStreamKStreamJoinProcessor();
+        this.joinBeforeMs = joinBeforeMs;
+        this.joinAfterMs = joinAfterMs;
+        this.joinGraceMs = windows.gracePeriodMs();
+        this.windowsBeforeMs = windows.beforeMs;
+        this.windowsAfterMs = windows.afterMs;
+        this.outer = outer;
+        this.joiner = joiner;

Review Comment:
   nit: Reorder this to make the diff smaller.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -43,95 +42,73 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, 
K, VOut> {
+abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements 
ProcessorSupplier<K, VThis, K, VOut> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-    private final String otherWindowName;
+    private final boolean outer;
+    private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, 
? extends VOut> joiner;
     private final long joinBeforeMs;
     private final long joinAfterMs;
     private final long joinGraceMs;
+    private final String otherWindowName;
+    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
     private final boolean enableSpuriousResultFix;
+    private final Optional<String> outerJoinWindowName;
     private final long windowsBeforeMs;
     private final long windowsAfterMs;
 
-    private final boolean outer;
-    private final boolean isLeftSide;
-    private final Optional<String> outerJoinWindowName;
-    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joiner;
-
-    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-    KStreamKStreamJoin(final boolean isLeftSide,
-                       final String otherWindowName,
-                       final JoinWindowsInternal windows,
-                       final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, ? extends VOut> joiner,
-                       final boolean outer,
-                       final Optional<String> outerJoinWindowName,
-                       final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-        this.isLeftSide = isLeftSide;
+    KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier 
sharedTimeTrackerSupplier,
+                       final Optional<String> outerJoinWindowName, final long 
joinBeforeMs,
+                       final long joinAfterMs, final JoinWindowsInternal 
windows, final boolean outer,
+                       final ValueJoinerWithKey<? super K, ? super VThis, ? 
super VOther, ? extends VOut> joiner) {
         this.otherWindowName = otherWindowName;
-        if (isLeftSide) {
-            this.joinBeforeMs = windows.beforeMs;
-            this.joinAfterMs = windows.afterMs;
-        } else {
-            this.joinBeforeMs = windows.afterMs;
-            this.joinAfterMs = windows.beforeMs;
-        }
-        this.windowsAfterMs = windows.afterMs;
-        this.windowsBeforeMs = windows.beforeMs;
-        this.joinGraceMs = windows.gracePeriodMs();
+        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
         this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
-        this.joiner = joiner;
-        this.outer = outer;
         this.outerJoinWindowName = outerJoinWindowName;
-        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
-    }
-
-    @Override
-    public Processor<K, V1, K, VOut> get() {
-        return new KStreamKStreamJoinProcessor();
+        this.joinBeforeMs = joinBeforeMs;
+        this.joinAfterMs = joinAfterMs;
+        this.joinGraceMs = windows.gracePeriodMs();
+        this.windowsBeforeMs = windows.beforeMs;
+        this.windowsAfterMs = windows.afterMs;
+        this.outer = outer;
+        this.joiner = joiner;
     }
 
-    private class KStreamKStreamJoinProcessor extends ContextualProcessor<K, 
V1, K, VOut> {
-        private WindowStore<K, V2> otherWindowStore;
-        private Sensor droppedRecordsSensor;
-        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
+    protected abstract class KStreamKStreamJoinProcessor extends 
ContextualProcessor<K, VThis, K, VOut> {
         private InternalProcessorContext<K, VOut> internalProcessorContext;
+        private Sensor droppedRecordsSensor;
+
         private TimeTracker sharedTimeTracker;
+        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VL, VR>>> outerJoinStore =
+                Optional.empty();
+        private WindowStore<K, VOther> otherWindowStore;
 
         @Override
         public void init(final ProcessorContext<K, VOut> context) {
             super.init(context);
             internalProcessorContext = (InternalProcessorContext<K, VOut>) 
context;
-
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
-            droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+            droppedRecordsSensor =
+                    droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
             sharedTimeTracker = 
sharedTimeTrackerSupplier.get(context.taskId());
 
             if (enableSpuriousResultFix) {
                 outerJoinStore = 
outerJoinWindowName.map(context::getStateStore);
 
                 sharedTimeTracker.setEmitInterval(
-                    StreamsConfig.InternalConfig.getLong(
-                        context.appConfigs(),
-                        
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
-                        1000L
-                    )
+                        StreamsConfig.InternalConfig.getLong(
+                                context.appConfigs(),
+                                
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
+                                1000L
+                        )
                 );

Review Comment:
   nit: revert changes here



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -184,20 +146,34 @@ public void process(final Record<K, V1> record) {
                     // This condition below allows us to process the 
out-of-order records without the need
                     // to hold it in the temporary outer store
                     if (!outerJoinStore.isPresent() || timeTo < 
sharedTimeTracker.streamTime) {
-                        
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
+                        context().forward(
+                                record.withValue(joiner.apply(record.key(), 
record.value(), null)));
                     } else {
-                        sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
-                        outerJoinStore.ifPresent(store -> store.put(
-                            TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-                            LeftOrRightValue.make(isLeftSide, 
record.value())));
+                        sharedTimeTracker.updatedMinTime(recordTimestamp);
+                        putInOuterJoinStore(record, recordTimestamp);
                     }
                 }
             }
         }
 
+        @Override
+        public void close() {
+            sharedTimeTrackerSupplier.remove(context().taskId());
+        }

Review Comment:
   nit: move this down near the bottom for a smaller diff



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##########
@@ -89,8 +74,8 @@ public V2 getRightValue() {
     @Override
     public String toString() {
         return "<"
-            + ((leftValue != null) ? "left," + leftValue : "right," + 
rightValue)
-            + ">";
+                + ((leftValue != null) ? "left," + leftValue : "right," + 
rightValue)
+                + ">";

Review Comment:
   nit: revert this indent change



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -43,95 +42,73 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, 
K, VOut> {
+abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements 
ProcessorSupplier<K, VThis, K, VOut> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-    private final String otherWindowName;
+    private final boolean outer;
+    private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, 
? extends VOut> joiner;
     private final long joinBeforeMs;
     private final long joinAfterMs;
     private final long joinGraceMs;
+    private final String otherWindowName;
+    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
     private final boolean enableSpuriousResultFix;
+    private final Optional<String> outerJoinWindowName;
     private final long windowsBeforeMs;
     private final long windowsAfterMs;
 
-    private final boolean outer;
-    private final boolean isLeftSide;
-    private final Optional<String> outerJoinWindowName;
-    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joiner;
-
-    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-    KStreamKStreamJoin(final boolean isLeftSide,
-                       final String otherWindowName,
-                       final JoinWindowsInternal windows,
-                       final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, ? extends VOut> joiner,
-                       final boolean outer,
-                       final Optional<String> outerJoinWindowName,
-                       final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-        this.isLeftSide = isLeftSide;
+    KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier 
sharedTimeTrackerSupplier,
+                       final Optional<String> outerJoinWindowName, final long 
joinBeforeMs,
+                       final long joinAfterMs, final JoinWindowsInternal 
windows, final boolean outer,
+                       final ValueJoinerWithKey<? super K, ? super VThis, ? 
super VOther, ? extends VOut> joiner) {
         this.otherWindowName = otherWindowName;
-        if (isLeftSide) {
-            this.joinBeforeMs = windows.beforeMs;
-            this.joinAfterMs = windows.afterMs;
-        } else {
-            this.joinBeforeMs = windows.afterMs;
-            this.joinAfterMs = windows.beforeMs;
-        }
-        this.windowsAfterMs = windows.afterMs;
-        this.windowsBeforeMs = windows.beforeMs;
-        this.joinGraceMs = windows.gracePeriodMs();
+        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
         this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
-        this.joiner = joiner;
-        this.outer = outer;
         this.outerJoinWindowName = outerJoinWindowName;
-        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
-    }
-
-    @Override
-    public Processor<K, V1, K, VOut> get() {
-        return new KStreamKStreamJoinProcessor();
+        this.joinBeforeMs = joinBeforeMs;
+        this.joinAfterMs = joinAfterMs;
+        this.joinGraceMs = windows.gracePeriodMs();
+        this.windowsBeforeMs = windows.beforeMs;
+        this.windowsAfterMs = windows.afterMs;
+        this.outer = outer;
+        this.joiner = joiner;
     }
 
-    private class KStreamKStreamJoinProcessor extends ContextualProcessor<K, 
V1, K, VOut> {
-        private WindowStore<K, V2> otherWindowStore;
-        private Sensor droppedRecordsSensor;
-        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
+    protected abstract class KStreamKStreamJoinProcessor extends 
ContextualProcessor<K, VThis, K, VOut> {
         private InternalProcessorContext<K, VOut> internalProcessorContext;
+        private Sensor droppedRecordsSensor;
+
         private TimeTracker sharedTimeTracker;
+        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VL, VR>>> outerJoinStore =
+                Optional.empty();
+        private WindowStore<K, VOther> otherWindowStore;

Review Comment:
   nit: reorder this to make the diff smaller.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##########
@@ -103,7 +88,7 @@ public boolean equals(final Object o) {
         }
         final LeftOrRightValue<?, ?> that = (LeftOrRightValue<?, ?>) o;
         return Objects.equals(leftValue, that.leftValue) &&
-            Objects.equals(rightValue, that.rightValue);
+                Objects.equals(rightValue, that.rightValue);

Review Comment:
   nit: revert this indent change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to