gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1540138631


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##########
@@ -39,45 +39,6 @@ private LeftOrRightValue(final V1 leftValue, final V2 
rightValue) {
         this.rightValue = rightValue;
     }
 
-    /**
-     * Create a new {@link LeftOrRightValue} instance with the V1 value as 
{@code leftValue} and
-     * V2 value as null.
-     *
-     * @param leftValue the left V1 value
-     * @param <V1>      the type of the value
-     * @return a new {@link LeftOrRightValue} instance
-     */
-    public static <V1, V2> LeftOrRightValue<V1, V2> makeLeftValue(final V1 
leftValue) {

Review Comment:
   I think you should keep the `makeLeftValue` and `makeRightValue` functions 
as entrypoints to this class. Only the the impossible-to-type `make` function 
should be removed.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -250,7 +252,7 @@ private void emitNonJoinedOuterRecords(
                     }
 
                     final VOut nullJoinedValue;
-                    if (isLeftSide) {
+                    if (joinSide.isLeftSide()) {

Review Comment:
   This is another unsafe place that should be part of the enum.
   Try and eliminate all of the `unchecked` warnings in this class.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -155,7 +156,8 @@ public void process(final Record<K, V1> record) {
                         // we may delete some values with the same key early 
but since we are going
                         // range over all values of the same key even after 
failure, since the other window-store
                         // is only cleaned up by stream time, so this is okay 
for at-least-once.
-                        
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), 
otherRecordTimestamp), null);
+                        final JoinSide otherJoinSide = joinSide.isLeftSide() ? 
JoinSide.RIGHT : JoinSide.LEFT;

Review Comment:
   This could be a `JoinSide#complement` or `JoinSide#opposite` function, in 
case there are other situations where we need to know what the other side of 
the join is doing.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -173,7 +174,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final 
KStream<K, V1> lhs,
 
         final JoinWindowsInternal internalWindows = new 
JoinWindowsInternal(windows);
         final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new 
KStreamKStreamJoin<>(
-            true,
+            JoinSide.LEFT,

Review Comment:
   I think these call-sites are much more clear with the explicit "left" and 
"right". It's interesting that the variables here are joinThis and joinOther, 
instead of joinLeft and joinRight.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * An enum representing the side of a join operation.
+ * It provides methods to create instances of {@link LeftOrRightValue} based 
on the side specified.
+ */
+@SuppressWarnings("unchecked")
+public enum JoinSide {
+    LEFT("left") {
+        /**
+         * Create a new {@link LeftOrRightValue} instance with the V1 value as 
{@code leftValue} and V2 value as null.
+         *
+         * @param leftValue the left V1 value
+         * @param <V1> the type of the value
+         * @return a new {@link LeftOrRightValue} instance
+         */
+        @Override
+        public <V, V1, V2> LeftOrRightValue<V1, V2> make(final V leftValue) {
+            Objects.requireNonNull(leftValue, "The left join value is null");
+            return (LeftOrRightValue<V1, V2>) new 
LeftOrRightValue<>(leftValue, null);
+        }
+    },
+
+    RIGHT("right") {
+        /**
+         * Create a new {@link LeftOrRightValue} instance with the V2 value as 
{@code rightValue} and V1 value as null.
+         *
+         * @param rightValue the right V2 value
+         * @param <V2> the type of the value
+         * @return a new {@link LeftOrRightValue} instance
+         */
+        @Override
+        public <V, V1, V2> LeftOrRightValue<V1, V2> make(final V rightValue) {
+            Objects.requireNonNull(rightValue, "The left join value is null");
+            return (LeftOrRightValue<V1, V2>) new LeftOrRightValue<>(null, 
rightValue);
+        }
+
+    };
+
+    private final String joinSideName;
+
+    JoinSide(final String joinSideName) {
+        this.joinSideName = joinSideName;
+    }
+
+    public abstract <V, V1, V2> LeftOrRightValue<V1, V2> make(final V value);
+
+    /**
+     * Returns true if this JoinSide represents the left side.
+     *
+     * @return true if this JoinSide represents the left side, otherwise false
+     */
+    public boolean isLeftSide() {

Review Comment:
   This is a very permissive "escape hatch" from this enum, and could be hiding 
other abstractions that could be pulled into the enum.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to