gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1583698116


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -240,27 +185,33 @@ private void emitNonJoinedOuterRecords(
                     // There might be an outer record for the other joinSide 
which window has not closed yet
                     // We rely on the <timestamp><left/right-boolean><key> 
ordering of KeyValueIterator
                     final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
-                    if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+                    if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs
+                            >= sharedTimeTracker.streamTime) {
                         if (timestampedKeyAndJoinSide.isLeftSide()) {
-                            outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
+                            outerJoinLeftWindowOpen =
+                                    true; // there are no more candidates to 
emit on left-outerJoin-side
                         } else {
-                            outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
+                            outerJoinRightWindowOpen =
+                                    true; // there are no more candidates to 
emit on right-outerJoin-side
                         }
                         // We continue with the next outer record
                         continue;
                     }
-                    
+
                     final K key = timestampedKeyAndJoinSide.getKey();
-                    final LeftOrRightValue<V1, V2> leftOrRightValue = 
next.value;
-                    final VOut nullJoinedValue = getNullJoinedValue(key, 
leftOrRightValue);
+                    final LeftOrRightValue<VL, VR> leftOrRightValue = 
next.value;
+                    final VThis thisValue = getThisValue(leftOrRightValue);
+                    final VOther otherValue = getOtherValue(leftOrRightValue);
+                    final VOut nullJoinedValue = joiner.apply(key, thisValue, 
otherValue);
                     context().forward(
-                        
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
+                            
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
                     );

Review Comment:
   nit: This could be brought out to a method which depends only on `record` 
and `next`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########


Review Comment:
   > No more un safe type casting to be found in this class. Not that much has 
changed in this abstract class. I have moved some code in methods.
   
   Very nice! I really like the new types, and the abstract methods are very 
minimal.
   
   > Side Note: I don't have the correct formatter configured :/ I couldn't 
find any contribution notes to set the correct one.
   
   I'm not aware of a standard formatter either. I remember tweaking the 
IntelliJ default rules slightly, but taking a look at them now I'm not seeing 
evidence of what I changed.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##########


Review Comment:
   The left and right classes are perfect in my opinion.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -43,161 +42,106 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, 
K, VOut> {
+abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements 
ProcessorSupplier<K, VThis, K, VOut> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-    private final String otherWindowName;
+    private final boolean outer;
+    private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, 
? extends VOut> joiner;
     private final long joinBeforeMs;
     private final long joinAfterMs;
     private final long joinGraceMs;
+    private final String otherWindowName;
+    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
     private final boolean enableSpuriousResultFix;
+    private final Optional<String> outerJoinWindowName;
     private final long windowsBeforeMs;
     private final long windowsAfterMs;
 
-    private final boolean outer;
-    private final boolean isLeftSide;
-    private final Optional<String> outerJoinWindowName;
-    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joiner;
-
-    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-    KStreamKStreamJoin(final boolean isLeftSide,
-                       final String otherWindowName,
-                       final JoinWindowsInternal windows,
-                       final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, ? extends VOut> joiner,
-                       final boolean outer,
-                       final Optional<String> outerJoinWindowName,
-                       final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-        this.isLeftSide = isLeftSide;
+    KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier 
sharedTimeTrackerSupplier,
+                       final Optional<String> outerJoinWindowName, final long 
joinBeforeMs,
+                       final long joinAfterMs, final JoinWindowsInternal 
windows, final boolean outer,
+                       final ValueJoinerWithKey<? super K, ? super VThis, ? 
super VOther, ? extends VOut> joiner) {
         this.otherWindowName = otherWindowName;
-        if (isLeftSide) {
-            this.joinBeforeMs = windows.beforeMs;
-            this.joinAfterMs = windows.afterMs;
-        } else {
-            this.joinBeforeMs = windows.afterMs;
-            this.joinAfterMs = windows.beforeMs;
-        }
-        this.windowsAfterMs = windows.afterMs;
-        this.windowsBeforeMs = windows.beforeMs;
-        this.joinGraceMs = windows.gracePeriodMs();
+        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
         this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
-        this.joiner = joiner;
-        this.outer = outer;
         this.outerJoinWindowName = outerJoinWindowName;
-        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
-    }
-
-    @Override
-    public Processor<K, V1, K, VOut> get() {
-        return new KStreamKStreamJoinProcessor();
+        this.joinBeforeMs = joinBeforeMs;
+        this.joinAfterMs = joinAfterMs;
+        this.joinGraceMs = windows.gracePeriodMs();
+        this.windowsBeforeMs = windows.beforeMs;
+        this.windowsAfterMs = windows.afterMs;
+        this.outer = outer;
+        this.joiner = joiner;
     }
 
-    private class KStreamKStreamJoinProcessor extends ContextualProcessor<K, 
V1, K, VOut> {
-        private WindowStore<K, V2> otherWindowStore;
-        private Sensor droppedRecordsSensor;
-        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
+    protected abstract class KStreamKStreamJoinProcessor extends 
ContextualProcessor<K, VThis, K, VOut> {
         private InternalProcessorContext<K, VOut> internalProcessorContext;
+        private Sensor droppedRecordsSensor;
+
         private TimeTracker sharedTimeTracker;
+        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<VL, VR>>> outerJoinStore =
+                Optional.empty();
+        private WindowStore<K, VOther> otherWindowStore;
 
         @Override
         public void init(final ProcessorContext<K, VOut> context) {
             super.init(context);
             internalProcessorContext = (InternalProcessorContext<K, VOut>) 
context;
-
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
-            droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+            droppedRecordsSensor =
+                    droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
             sharedTimeTracker = 
sharedTimeTrackerSupplier.get(context.taskId());
 
             if (enableSpuriousResultFix) {
                 outerJoinStore = 
outerJoinWindowName.map(context::getStateStore);
 
                 sharedTimeTracker.setEmitInterval(
-                    StreamsConfig.InternalConfig.getLong(
-                        context.appConfigs(),
-                        
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
-                        1000L
-                    )
+                        StreamsConfig.InternalConfig.getLong(
+                                context.appConfigs(),
+                                
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
+                                1000L
+                        )
                 );
             }
         }
 
-        @SuppressWarnings("unchecked")
         @Override
-        public void process(final Record<K, V1> record) {
-
-            final long inputRecordTimestamp = record.timestamp();
-            final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
-            final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
-            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
-
-            if (outer && record.key() == null && record.value() != null) {
-                context().forward(record.withValue(joiner.apply(record.key(), 
record.value(), null)));
+        public void process(final Record<K, VThis> thisRecord) {

Review Comment:
   nit: Personally I don't think calling it thisRecord and thisRecordTimestamp 
are so necessary, because the type checker will make sure we're using the right 
one.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -275,24 +226,47 @@ private void emitNonJoinedOuterRecords(
             }
         }
 
-        @SuppressWarnings("unchecked")
-        private VOut getNullJoinedValue(
-            final K key, 
-            final LeftOrRightValue<V1, V2> leftOrRightValue) {
-            // depending on the JoinSide fill in the joiner key and joiner 
values
-            if (isLeftSide) {
-                return joiner.apply(key,
-                        leftOrRightValue.getLeftValue(),
-                        leftOrRightValue.getRightValue());
-            } else {
-                return joiner.apply(key,
-                        (V1) leftOrRightValue.getRightValue(),
-                        (V2) leftOrRightValue.getLeftValue());
+
+        private void performInnerJoin(final Record<K, VThis> thisRecord, final 
long thisRecordTimestamp) {

Review Comment:
   nit: I think this a bit of a confusing method name, since it mentions 
InnerJoin, but it has needOuterJoin, and interacts with the outerJoinStore. I 
think this should be inlined back into process().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to