Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-29 Thread via GitHub


cadonna merged PR #15601:
URL: https://github.com/apache/kafka/pull/15601


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-28 Thread via GitHub


raminqaf commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2135147503

   > @raminqaf Could you please rebase this PR on current trunk to get the new 
build setup? We need to restart the builds because one was red.
   
   Done! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-28 Thread via GitHub


cadonna commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2135043687

   @raminqaf Could you please rebase this PR on current trunk to get the new 
build setup? We need to restart the builds because one was red. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-21 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1607786468


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -187,17 +157,25 @@ public void process(final Record record) {
 
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
 } else {
 sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
-outerJoinStore.ifPresent(store -> store.put(
-TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-LeftOrRightValue.make(isLeftSide, 
record.value(;
+putInOuterJoinStore(record);
 }
 }
 }
 }
 
+protected abstract TimestampedKeyAndJoinSide makeThisKey(final K 
key, final long inputRecordTimestamp);
+
+protected abstract LeftOrRightValue makeThisValue(final 
VThis thisValue);
+
+protected abstract TimestampedKeyAndJoinSide makeOtherKey(final K 
key, final long timestamp);
+
+protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue);
+
+protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue);
+
 private void emitNonJoinedOuterRecords(
-final KeyValueStore, 
LeftOrRightValue> store,
-final Record record) {
+final KeyValueStore, 
LeftOrRightValue> store,
+final Record record) {

Review Comment:
   Good catch! Avoiding the wildcard here makes it easier to read and 
understand. I have changed it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-17 Thread via GitHub


cadonna commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1605020116


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+
+import java.util.Optional;
+
+class KStreamKStreamLeftJoin extends KStreamKStreamJoin {

Review Comment:
   I would still prefer `KStreamKStreamJoinLeftSide`. It is the left side of a 
stream-stream join. The processor are also named 
`KStreamKStreamJoinLeftProcessor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-17 Thread via GitHub


cadonna commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1604937635


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -187,17 +157,25 @@ public void process(final Record record) {
 
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
 } else {
 sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
-outerJoinStore.ifPresent(store -> store.put(
-TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-LeftOrRightValue.make(isLeftSide, 
record.value(;
+putInOuterJoinStore(record);
 }
 }
 }
 }
 
+protected abstract TimestampedKeyAndJoinSide makeThisKey(final K 
key, final long inputRecordTimestamp);
+
+protected abstract LeftOrRightValue makeThisValue(final 
VThis thisValue);
+
+protected abstract TimestampedKeyAndJoinSide makeOtherKey(final K 
key, final long timestamp);
+
+protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue);
+
+protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue);
+
 private void emitNonJoinedOuterRecords(
-final KeyValueStore, 
LeftOrRightValue> store,
-final Record record) {
+final KeyValueStore, 
LeftOrRightValue> store,
+final Record record) {

Review Comment:
   nit:
   ```java
   private void emitNonJoinedOuterRecords(final 
KeyValueStore, LeftOrRightValue> 
store, 
  final Record record) {
   
   ```



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -275,20 +246,22 @@ private void emitNonJoinedOuterRecords(
 }
 }
 
-@SuppressWarnings("unchecked")
-private VOut getNullJoinedValue(
-final K key, 
-final LeftOrRightValue leftOrRightValue) {
-// depending on the JoinSide fill in the joiner key and joiner 
values
-if (isLeftSide) {
-return joiner.apply(key,
-leftOrRightValue.getLeftValue(),
-leftOrRightValue.getRightValue());
-} else {
-return joiner.apply(key,
-(V1) leftOrRightValue.getRightValue(),
-(V2) leftOrRightValue.getLeftValue());
-}
+private void forwardNonJoinedOuterRecords(final Record record, 
final KeyValue, ? extends 
LeftOrRightValue> nextKeyValue) {

Review Comment:
   I think it would be simpler to define this method as:
   ```java
   private void forwardNonJoinedOuterRecords(final Record record,
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide,
 final LeftOrRightValue leftOrRightValue) {
   ```
   It makes the code a bit simpler and shorter.
   Also here, why not `Record record`. That is exactly the type used 
at the call site.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -187,17 +157,25 @@ public void process(final Record record) {
 
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
 } else {
 sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
-outerJoinStore.ifPresent(store -> store.put(
-TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-LeftOrRightValue.make(isLeftSide, 
record.value(;
+putInOuterJoinStore(record);
 }
 }
 }
 }
 
+protected abstract TimestampedKeyAndJoinSide makeThisKey(final K 
key, final long inputRecordTimestamp);
+
+protected abstract LeftOrRightValue makeThisValue(final 
VThis thisValue);
+
+protected abstract TimestampedKeyAndJoinSide makeOtherKey(final K 
key, final long timestamp);
+
+protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue);
+
+protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue);
+
 private void emitNonJoinedOuterRecords(
-final KeyValueStore, 
LeftOrRightValue> store,
-final Record record) {
+final KeyValueStore, 
LeftOrRightValue> store,
+final Record record) {

Review Comment:
   Shouldn't that be
   ```java
   final Record record
   ```
   ?



##

Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-14 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1600269458


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   Just to throw my own explanation in here:
   VLeft and VRight are "absolute" in that they are the left and right types of 
the overall join. Both sides of the join have equivalent VLeft and VRight 
types, because they share a common outerJoinStore instance.
   
   VThis and VOther are "relative" in that they are the type of records 
entering "this" side of the join, and the "other" side of the join, and this is 
necessarily swapped for the other side of the join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-14 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1600077088


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   We needed a way to detect the type of the side being called dynamically. So 
that the super class can handle them based on the caller. It also ensures the 
type safety.
   
   When this class is called by the _left side_ of the join `VThis` would be 
_left_ left type (`VLeft`) and `VOther` would be _right_ type (`VRight`).
   
   ```java
   class KStreamKStreamLeftJoinSide extends 
KStreamKStreamJoin {


--^--  --^---


VThis VOther
   ```
   
   Whereas for the _right side_ of the join `VThis` would be _right_ left type 
(`VRight`) and `VOther` would be _left_ type (`VLeft`).
   
   ```java
   class KStreamKStreamRightJoinSide extends 
KStreamKStreamJoin


--^--  --^---


VThis VOther
   ```
   



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   We needed a way to detect the type of the side being called dynamically so 
that the super class can handle them based on the caller. This also ensures 
type safety.
   
   When this class is called by the _left side_ of the join `VThis` would be 
_left_ left type (`VLeft`) and `VOther` would be _right_ type (`VRight`).
   
   ```java
   class KStreamKStreamLeftJoinSide extends 
KStreamKStreamJoin {


--^--  --^---


VThis VOther
   ```
   
   Whereas for the _right side_ of the join `VThis` would be _right_ left type 
(`VRight`) and `VOther` would be _left_ type (`VLeft`).
   
   ```java
   class KStreamKStreamRightJoinSide extends 
KStreamKStreamJoin


--^--  --^---


VThis VOther
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-14 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1600088096


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   It is more tangible if we look at the abstract methods:
   ```java
   protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue);
   
   protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue);
   ```
   Depending on which join side implements these methods, `VThis` and `VOther` 
can change. For instance, the `getThisValue()` method on the _left side_ of the 
join would be:
   ```java
@Override
   public VLeft getThisValue(final LeftOrRightValue leftOrRightValue) {
 return leftOrRightValue.getLeftValue();
   }
   ```
   Respectively, on the _right side_ we have
   ```java
 @Override
 public VRight getThisValue(final LeftOrRightValue leftOrRightValue) {
 return leftOrRightValue.getRightValue();
 }
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-14 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1600088096


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   It is more tangible if we look at the abstract methods:
   ```java
   protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue);
   
   protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue);
   ```
   Depending on which join side implements these methods, `VThis` and `VOther` 
can change. For instance, the getThisValue on the _left side_ of the join would 
be:
   ```java
@Override
   public VLeft getThisValue(final LeftOrRightValue leftOrRightValue) {
 return leftOrRightValue.getLeftValue();
   }
   ```
   Respectively, on the _right side_ we have
   ```java
 @Override
 public VRight getThisValue(final LeftOrRightValue leftOrRightValue) {
 return leftOrRightValue.getRightValue();
 }
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-14 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1600088096


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   This is more tangible in the abstract methods:
   ```java
   protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue);
   
   protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue);
   ```
   Depending on which join side implements these methods, `VThis` and `VOther` 
can change. For instance, the getThisValue on the _left side_ of the join would 
be:
   ```java
@Override
   public VLeft getThisValue(final LeftOrRightValue leftOrRightValue) {
 return leftOrRightValue.getLeftValue();
   }
   ```
   Respectively, on the _right side_ we have
   ```java
 @Override
 public VRight getThisValue(final LeftOrRightValue leftOrRightValue) {
 return leftOrRightValue.getRightValue();
 }
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-14 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1600076976


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+
+import java.util.Optional;
+
+class KStreamKStreamLeftJoin extends KStreamKStreamJoin {

Review Comment:
   Good catch. What do you think about `KStreamKStreamLeftJoinSide` instead?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   We needed a way to detect the type of the side being called dynamically. So 
that the super class can handle them based on the caller.
   
   When this class is called by the _left side_ of the join `VThis` would be 
_left_ left type (`VLeft`) and `VOther` would be _right_ type (`VRight`).
   
   ```java
   class KStreamKStreamLeftJoinSide extends 
KStreamKStreamJoin {


--^--  --^---


VThis VOther
   ```
   
   Whereas for the _right side_ of the join `VThis` would be _right_ left type 
(`VRight`) and `VOther` would be _left_ type (`VLeft`).
   
   ```java
   class KStreamKStreamRightJoinSide extends 
KStreamKStreamJoin


--^--  --^---


VThis VOther
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-14 Thread via GitHub


cadonna commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r166724


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   I do not get the difference between `VL` and `VThis` as well as `VR` and 
`VOther`. Could you please elaborate?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+
+import java.util.Optional;
+
+class KStreamKStreamLeftJoin extends KStreamKStreamJoin {

Review Comment:
   Do you mean that this is the left side of the join or that this is a left 
join? That is a bit confusing. Maybe a better name might be 
`KStreamKStreamJoinLeftSide`.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,7 +42,7 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {

Review Comment:
   Also a better name for `VL` and `VR` would be `VLeft` and `VRight`, 
respectively. `VR` is sometimes used for result value type.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-08 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1593500243


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -221,41 +199,28 @@ private void emitNonJoinedOuterRecords(
 // reset to MAX_VALUE in case the store is empty
 sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
+try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
-boolean outerJoinLeftWindowOpen = false;
-boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
-// if windows are open for both joinSides we can break 
since there are no more candidates to emit
+final KeyValue, 
LeftOrRightValue> nextKeyValue = it.next();
+final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = nextKeyValue.key;
+sharedTimeTracker.minTime = 
timestampedKeyAndJoinSide.getTimestamp();
+if 
(isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, true) && 
isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, false)) {

Review Comment:
   Yeah, you are right! That's a good catch. I was depending too much on the 
tests. I couldn't find any test that triggered the condition even after 
reverting the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-07 Thread via GitHub


ableegoldman commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2099136409

   Thanks Greg. If no one's had time to look at this by next week, we'll assign 
a reviewer during the next Kafka Streams hangout


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-07 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1592924655


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -187,17 +157,25 @@ public void process(final Record record) {
 
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
 } else {
 sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
-outerJoinStore.ifPresent(store -> store.put(
-TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-LeftOrRightValue.make(isLeftSide, 
record.value(;
+putInOuterJoinStore(record, inputRecordTimestamp);

Review Comment:
   This can just take `record` and call `record.timestamp()` itself.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -221,41 +199,28 @@ private void emitNonJoinedOuterRecords(
 // reset to MAX_VALUE in case the store is empty
 sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
+try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
-boolean outerJoinLeftWindowOpen = false;
-boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
-// if windows are open for both joinSides we can break 
since there are no more candidates to emit
+final KeyValue, 
LeftOrRightValue> nextKeyValue = it.next();
+final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = nextKeyValue.key;
+sharedTimeTracker.minTime = 
timestampedKeyAndJoinSide.getTimestamp();
+if 
(isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, true) && 
isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, false)) {

Review Comment:
   Can this condition ever fire?
   If timestampedKeyAndJoinSide.isLeftSide() is true, then only the first 
condition can be true.
   If timestampedkeyAndJoinSide.isLeftSide() is false, then only the second 
condition can be true.
   
   These were state variables shared across multiple iterations and 
incorporated multiple timestampedKeyAndJoinSide before, now it's a function of 
just a single timestampedKeyAndJoinSide. Without knowing the context here, I 
would guess that would change the behavior.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -275,22 +240,34 @@ private void emitNonJoinedOuterRecords(
 }
 }
 
-@SuppressWarnings("unchecked")
-private VOut getNullJoinedValue(
-final K key, 
-final LeftOrRightValue leftOrRightValue) {
-// depending on the JoinSide fill in the joiner key and joiner 
values
-if (isLeftSide) {
-return joiner.apply(key,
-leftOrRightValue.getLeftValue(),
-leftOrRightValue.getRightValue());
-} else {
-return joiner.apply(key,
-(V1) leftOrRightValue.getRightValue(),
-(V2) leftOrRightValue.getLeftValue());
+private boolean isOuterJoinWindowOpenForSide(final 
TimestampedKeyAndJoinSide timestampedKeyAndJoinSide, final boolean 
isLeftSide) {
+if (isOuterJoinWindowOpen(timestampedKeyAndJoinSide)) {
+// there are no more candidates to emit on left-outerJoin-side
+return timestampedKeyAndJoinSide.isLeftSide() == isLeftSide;
 }
+return false;
+}
+
+private void forwardNonJoinedOuterRecords(final Record record, 
final KeyValue, ? extends 
LeftOrRightValue> nextKeyValue) {
+final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = 
nextKeyValue.key;
+final K key = timestampedKeyAndJoinSide.getKey();
+final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
+final LeftOrRightValue leftOrRightValue = 
nextKeyValue.value;
+final VThis thisValue = getThisValue(leftOrRightValue);
+final VOther otherValue = getOtherValue(leftOrRightValue);
+final VOut nullJoinedValue = joiner.apply(key, thisValue, 
otherValue);
+context().forward(
+
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
+);
+}
+
+private boolean isOuterJoinWindowOpen(final 
TimestampedKeyAndJoinSide timestampedKeyAndJoinSide) {
+final 

Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-07 Thread via GitHub


raminqaf commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2098817942

   @gharris1727 Thanks for the feedback! I reverted all the changes you 
requested and reverted a couple of other indentation problems that caused a 
diff. I can even go further and revert & inline the introduce private methods 
(i.e., `emitInnerJoin` or `putInOuterJoinStore`) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-07 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1591293880


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -184,20 +146,34 @@ public void process(final Record record) {
 // This condition below allows us to process the 
out-of-order records without the need
 // to hold it in the temporary outer store
 if (!outerJoinStore.isPresent() || timeTo < 
sharedTimeTracker.streamTime) {
-
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
+context().forward(
+record.withValue(joiner.apply(record.key(), 
record.value(), null)));

Review Comment:
   nit: revert



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,95 +42,73 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
 private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-private final String otherWindowName;
+private final boolean outer;
+private final ValueJoinerWithKey joiner;
 private final long joinBeforeMs;
 private final long joinAfterMs;
 private final long joinGraceMs;
+private final String otherWindowName;
+private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 private final boolean enableSpuriousResultFix;
+private final Optional outerJoinWindowName;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
 
-private final boolean outer;
-private final boolean isLeftSide;
-private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
-
-private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String otherWindowName,
-   final JoinWindowsInternal windows,
-   final ValueJoinerWithKey joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-this.isLeftSide = isLeftSide;
+KStreamKStreamJoin(final String otherWindowName, final TimeTrackerSupplier 
sharedTimeTrackerSupplier,
+   final Optional outerJoinWindowName, final long 
joinBeforeMs,
+   final long joinAfterMs, final JoinWindowsInternal 
windows, final boolean outer,
+   final ValueJoinerWithKey joiner) {

Review Comment:
   Revert the re-ordering here that isn't necessary, so it's easy to see the 
signature change for `joiner` and the removal of `isLeftSide`.
   
   We should also put each constructor argument on it's own to follow the 
previous style.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,95 +42,73 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
 private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-private final String otherWindowName;
+private final boolean outer;
+private final ValueJoinerWithKey joiner;
 private final long joinBeforeMs;
 private final long joinAfterMs;
 private final long joinGraceMs;
+private final String otherWindowName;
+private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 private final boolean enableSpuriousResultFix;
+private final Optional outerJoinWindowName;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
 
-private final boolean outer;
-private final boolean isLeftSide;
-private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
-
-private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String otherWindowName,
-   final JoinWindowsInternal windows,
-   final ValueJoinerWithKey joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier 

Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-04-30 Thread via GitHub


raminqaf commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2084636567

   @gharris1727 Thanks for the positive feedback! I am happy that you like the 
changes! I addressed all your nit reviews and refactored the logic of the 
non-final boolean variables (i.e., `needOuterJoin`, `outerJoinLeftWindowOpen`, 
`outerJoinRightWindowOpen`). Please have a look! 
   
   Tiny knowledge sharing: The `LeftOrRightValue` class represents a union type 
(either left or right and can be both simultaneouslye). In Java 21+ (I know 
very high Java version :D) , this is solved with sealed interfaces and records. 
I was reading this article recently and thought about the class: 
https://ifesunmola.com/sum-types-in-java/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-04-29 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1583698116


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -240,27 +185,33 @@ private void emitNonJoinedOuterRecords(
 // There might be an outer record for the other joinSide 
which window has not closed yet
 // We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
-if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs
+>= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
+outerJoinLeftWindowOpen =
+true; // there are no more candidates to 
emit on left-outerJoin-side
 } else {
-outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
+outerJoinRightWindowOpen =
+true; // there are no more candidates to 
emit on right-outerJoin-side
 }
 // We continue with the next outer record
 continue;
 }
-
+
 final K key = timestampedKeyAndJoinSide.getKey();
-final LeftOrRightValue leftOrRightValue = 
next.value;
-final VOut nullJoinedValue = getNullJoinedValue(key, 
leftOrRightValue);
+final LeftOrRightValue leftOrRightValue = 
next.value;
+final VThis thisValue = getThisValue(leftOrRightValue);
+final VOther otherValue = getOtherValue(leftOrRightValue);
+final VOut nullJoinedValue = joiner.apply(key, thisValue, 
otherValue);
 context().forward(
-
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
+
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
 );

Review Comment:
   nit: This could be brought out to a method which depends only on `record` 
and `next`.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##


Review Comment:
   > No more un safe type casting to be found in this class. Not that much has 
changed in this abstract class. I have moved some code in methods.
   
   Very nice! I really like the new types, and the abstract methods are very 
minimal.
   
   > Side Note: I don't have the correct formatter configured :/ I couldn't 
find any contribution notes to set the correct one.
   
   I'm not aware of a standard formatter either. I remember tweaking the 
IntelliJ default rules slightly, but taking a look at them now I'm not seeing 
evidence of what I changed.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##


Review Comment:
   The left and right classes are perfect in my opinion.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -43,161 +42,106 @@
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
-class KStreamKStreamJoin implements ProcessorSupplier {
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
 private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
-
-private final String otherWindowName;
+private final boolean outer;
+private final ValueJoinerWithKey joiner;
 private final long joinBeforeMs;
 private final long joinAfterMs;
 private final long joinGraceMs;
+private final String otherWindowName;
+private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 private final boolean enableSpuriousResultFix;
+private final Optional outerJoinWindowName;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
 
-private final boolean outer;
-private final boolean isLeftSide;
-private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
-
-private final TimeTrackerSupplier sharedTimeTrackerSupplier;
-
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String 

Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-04-13 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1563938474


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##


Review Comment:
   No more un safe type casting to be found in this class. Not that much has 
changed in this abstract class. I have moved some code in methods. 
   
   Side Note: I don't have the correct formatter configured :/ I couldn't find 
any contribution notes to set the correct one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-04-13 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1563929782


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##
@@ -63,21 +63,6 @@ public static  LeftOrRightValue 
makeRightValue(final V2 rightVal
 return new LeftOrRightValue<>(null, rightValue);
 }
 
-/**
- * Create a new {@link LeftOrRightValue} instance with the V value as 
{@code leftValue} if
- * {@code isLeftSide} is True; otherwise {@code rightValue} if {@code 
isLeftSide} is False.
- *
- * @param value the V value (either V1 or V2 type)
- * @paramthe type of the value
- * @return a new {@link LeftOrRightValue} instance
- */
-public static  LeftOrRightValue make(final boolean isLeftSide, final V 
value) {
-Objects.requireNonNull(value, "value is null");
-return isLeftSide
-? LeftOrRightValue.makeLeftValue(value)
-: LeftOrRightValue.makeRightValue(value);
-}
-

Review Comment:
   Removed as promised  



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##


Review Comment:
   Not that much has changed in this abstract class. I have moved some code in 
methods. 



##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java:
##
@@ -52,7 +52,26 @@ private TimestampedKeyAndJoinSide(final boolean leftSide, 
final K key, final lon
 public static  TimestampedKeyAndJoinSide make(final boolean 
leftSide, final K key, final long timestamp) {
 return new TimestampedKeyAndJoinSide<>(leftSide, key, timestamp);
 }
-
+/**
+ * Create a new {@link TimestampedKeyAndJoinSide} instance for the left 
join side if the provide {@code key} is not {@code null}.
+ *
+ * @param key  the key
+ * @param   the type of the key
+ * @return a new {@link TimestampedKeyAndJoinSide} instance for the left 
join side if the provide {@code key} is not {@code null}
+ */
+public static  TimestampedKeyAndJoinSide makeLeft(final K key, final 
long timestamp) {
+return new TimestampedKeyAndJoinSide<>(true, key, timestamp);
+}
+/**
+ * Create a new {@link TimestampedKeyAndJoinSide} instance for the right 
join side if the provide {@code key} is not {@code null}.
+ *
+ * @param key  the key
+ * @param   the type of the key
+ * @return a new {@link TimestampedKeyAndJoinSide} instance for the right 
join side if the provide {@code key} is not {@code null}
+ */
+public static  TimestampedKeyAndJoinSide makeRight(final K key, 
final long timestamp) {
+return new TimestampedKeyAndJoinSide<>(false, key, timestamp);
+}

Review Comment:
   @gharris1727 I have added these static factory methods for a cleaner 
definition. If you prefer, I can have them removed and use the `make` method.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##


Review Comment:
   A clean abstraction. Each join side defines its "this side value" and "other 
side value". 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-04-05 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1554016667


##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java:
##
@@ -33,28 +34,36 @@
 public class TimestampedKeyAndJoinSide {
 private final K key;
 private final long timestamp;
-private final boolean leftSide;
+private final JoinSide joinSide;
 
-private TimestampedKeyAndJoinSide(final boolean leftSide, final K key, 
final long timestamp) {
+private TimestampedKeyAndJoinSide(final JoinSide joinSide, final K key, 
final long timestamp) {
 this.key = Objects.requireNonNull(key, "key cannot be null");
-this.leftSide = leftSide;
+this.joinSide = joinSide;
 this.timestamp = timestamp;
 }
 
 /**
- * Create a new {@link TimestampedKeyAndJoinSide} instance if the provide 
{@code key} is not {@code null}.
+ * Create a new {@link TimestampedKeyAndJoinSide} instance if the provided 
{@code key} is not {@code null}.
  *
- * @param leftSide True if the key is part of the left join side; False if 
it is from the right join side
+ * @param joinSide Whether the key is part of the {@link JoinSide#LEFT} 
side; or it is from the {@link JoinSide#RIGHT} side
  * @param key  the key
  * @param   the type of the key
- * @return a new {@link TimestampedKeyAndJoinSide} instance if the provide 
{@code key} is not {@code null}
+ * @return a new {@link TimestampedKeyAndJoinSide} instance if the 
provided {@code key} is not {@code null}
  */
-public static  TimestampedKeyAndJoinSide make(final boolean 
leftSide, final K key, final long timestamp) {
-return new TimestampedKeyAndJoinSide<>(leftSide, key, timestamp);
+public static  TimestampedKeyAndJoinSide make(final JoinSide 
joinSide, final K key, final long timestamp) {

Review Comment:
   Since this is only used in tests now, I think you can remove this and 
replace the test call-sites with the new functions.
   
   Make sure to copy the javadoc to the new signatures too.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.Optional;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamLeftJoin extends KStreamKStreamJoin {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
+
+KStreamKStreamLeftJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joiner,
+final boolean outer,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+super(otherWindowName, windows, windows.beforeMs, windows.afterMs, 
joiner, outerJoinWindowName,
+sharedTimeTrackerSupplier, outer);
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamLeftJoinProcessor();
+}
+
+private class KStreamKStreamLeftJoinProcessor extends 
KStreamKStreamJoinProcessor {
+@Override
+public void process(final Record leftRecord) {
+final long inputRecordTimestamp = leftRecord.timestamp();
+final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
+final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
+

Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1545788943


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamRightJoin.java:
##
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.Optional;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamRightJoin extends KStreamKStreamJoin {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamRightJoin.class);
+
+
+KStreamKStreamRightJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joiner,
+final boolean outer,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+super(otherWindowName, windows, windows.afterMs, windows.beforeMs, 
joiner, outerJoinWindowName,
+sharedTimeTrackerSupplier, outer);
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamRightJoinProcessor();
+}
+
+private class KStreamKStreamRightJoinProcessor extends 
KStreamKStreamJoinProcessor {
+@Override
+public void process(final Record rightRecord) {
+final long inputRecordTimestamp = rightRecord.timestamp();
+final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
+final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+if (outer && rightRecord.key() == null && rightRecord.value() != 
null) {
+
context().forward(rightRecord.withValue(joiner.apply(rightRecord.key(), 
rightRecord.value(), null)));
+return;
+} else if (StreamStreamJoinUtil.skipRecord(rightRecord, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+// Emit all non-joined records which window has closed
+if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
+rightOuterJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, rightRecord));
+}
+
+boolean needOuterJoin = outer;
+// TODO: Where and when are the records put in this window store?
+try (final WindowStoreIterator iter = 
otherWindowStore.fetch(rightRecord.key(), timeFrom, timeTo)) {
+while (iter.hasNext()) {
+needOuterJoin = false;
+final KeyValue leftRecord = iter.next();
+final long leftRecordTimestamp = leftRecord.key;
+
+rightOuterJoinStore.ifPresent(store -> {
+// use putIfAbsent to first read and see if there's 
any values for the key,
+// if yes delete the key, otherwise do not issue a put;
+// we may delete some values with the same key early 
but since we are going
+// range over all values of the same key even after 
failure, since the other window-store
+// is only cleaned up by stream time, so this is okay 
for at-least-once.
+store.putIfAbsent(
+
TimestampedKeyAndJoinSide.makeLeftSide(rightRecord.key(), leftRecordTimestamp), 
null);
+});
+
+final VOut joinResult = joiner.apply(rightRecord.key(), 

Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1545727392


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -16,276 +16,98 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import java.util.Optional;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
 import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.LeftOrRightValue;
 import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-
-import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
-import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
-
-class KStreamKStreamJoin implements ProcessorSupplier {
-private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
 
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
+protected final ValueJoinerWithKey joiner;
+protected final long joinGraceMs;
+protected final boolean outer;
+protected final long joinBeforeMs;
+protected final long joinAfterMs;
 private final String otherWindowName;
-private final long joinBeforeMs;
-private final long joinAfterMs;
-private final long joinGraceMs;
 private final boolean enableSpuriousResultFix;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
-
-private final boolean outer;
-private final boolean isLeftSide;
 private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
 
 private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String otherWindowName,
-   final JoinWindowsInternal windows,
-   final ValueJoinerWithKey joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-this.isLeftSide = isLeftSide;
+KStreamKStreamJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final long joinBeforeMs,
+final long joinAfterMs,
+final ValueJoinerWithKey joiner,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier,
+final boolean outer) {
 this.otherWindowName = otherWindowName;
-if (isLeftSide) {
-this.joinBeforeMs = windows.beforeMs;
-this.joinAfterMs = windows.afterMs;
-} else {
-this.joinBeforeMs = windows.afterMs;
-this.joinAfterMs = windows.beforeMs;
-}
 this.windowsAfterMs = windows.afterMs;
 this.windowsBeforeMs = windows.beforeMs;
-this.joinGraceMs = windows.gracePeriodMs();
+this.joinBeforeMs = joinBeforeMs;
+this.joinAfterMs = joinAfterMs;
 this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
 this.joiner = joiner;
-this.outer = outer;
+this.joinGraceMs = windows.gracePeriodMs();
 this.outerJoinWindowName = outerJoinWindowName;
 this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
+this.outer = outer;
 }
 
-@Override
-public Processor get() {
-return new KStreamKStreamJoinProcessor();
-}
-
-private class KStreamKStreamJoinProcessor extends ContextualProcessor {
-private WindowStore otherWindowStore;
-

Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


mcmmining commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1545728831


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.Optional;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamLeftJoin extends KStreamKStreamJoin {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
+
+KStreamKStreamLeftJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joiner,
+final boolean outer,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+super(otherWindowName, windows, windows.beforeMs, windows.afterMs, 
joiner, outerJoinWindowName,
+sharedTimeTrackerSupplier, outer);
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamLeftJoinProcessor();
+}
+
+private class KStreamKStreamLeftJoinProcessor extends 
KStreamKStreamJoinProcessor {
+@Override
+public void process(final Record leftRecord) {
+final long inputRecordTimestamp = leftRecord.timestamp();
+final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
+final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+if (outer && leftRecord.key() == null && leftRecord.value() != 
null) {
+final VOut leftJoinValue = joiner.apply(leftRecord.key(), 
leftRecord.value(), null);
+context().forward(leftRecord.withValue(leftJoinValue));
+return;
+} else if (StreamStreamJoinUtil.skipRecord(leftRecord, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+// Emit all non-joined records which window has closed
+if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
+leftOuterJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, leftRecord));
+}
+
+boolean needOuterJoin = outer;
+try (final WindowStoreIterator iter = 
otherWindowStore.fetch(leftRecord.key(), timeFrom, timeTo)) {

Review Comment:
   > I also have a couple of questions. When is this store getting filled? I 
couldn't find anywhere where we call `.put()` for this window store. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1545727392


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -16,276 +16,98 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import java.util.Optional;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
 import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.LeftOrRightValue;
 import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-
-import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
-import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
-
-class KStreamKStreamJoin implements ProcessorSupplier {
-private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
 
+abstract class KStreamKStreamJoin implements 
ProcessorSupplier {
+protected final ValueJoinerWithKey joiner;
+protected final long joinGraceMs;
+protected final boolean outer;
+protected final long joinBeforeMs;
+protected final long joinAfterMs;
 private final String otherWindowName;
-private final long joinBeforeMs;
-private final long joinAfterMs;
-private final long joinGraceMs;
 private final boolean enableSpuriousResultFix;
 private final long windowsBeforeMs;
 private final long windowsAfterMs;
-
-private final boolean outer;
-private final boolean isLeftSide;
 private final Optional outerJoinWindowName;
-private final ValueJoinerWithKey joiner;
 
 private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 
-KStreamKStreamJoin(final boolean isLeftSide,
-   final String otherWindowName,
-   final JoinWindowsInternal windows,
-   final ValueJoinerWithKey joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-this.isLeftSide = isLeftSide;
+KStreamKStreamJoin(final String otherWindowName,
+final JoinWindowsInternal windows,
+final long joinBeforeMs,
+final long joinAfterMs,
+final ValueJoinerWithKey joiner,
+final Optional outerJoinWindowName,
+final TimeTrackerSupplier sharedTimeTrackerSupplier,
+final boolean outer) {
 this.otherWindowName = otherWindowName;
-if (isLeftSide) {
-this.joinBeforeMs = windows.beforeMs;
-this.joinAfterMs = windows.afterMs;
-} else {
-this.joinBeforeMs = windows.afterMs;
-this.joinAfterMs = windows.beforeMs;
-}
 this.windowsAfterMs = windows.afterMs;
 this.windowsBeforeMs = windows.beforeMs;
-this.joinGraceMs = windows.gracePeriodMs();
+this.joinBeforeMs = joinBeforeMs;
+this.joinAfterMs = joinAfterMs;
 this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
 this.joiner = joiner;
-this.outer = outer;
+this.joinGraceMs = windows.gracePeriodMs();
 this.outerJoinWindowName = outerJoinWindowName;
 this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
+this.outer = outer;
 }
 
-@Override
-public Processor get() {
-return new KStreamKStreamJoinProcessor();
-}
-
-private class KStreamKStreamJoinProcessor extends ContextualProcessor {
-private WindowStore otherWindowStore;
-

Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-31 Thread via GitHub


raminqaf commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2028756399

   @gharris1727 I have broken down the KStreamKstreamJoin class into two 
classes. For now, I just moved the code (+the fix in #15510) to see if all the 
tests pass and if I am going in the correct direction. I managed to get rid of 
the unsafe type casts appropriately. 
   I created [two outer join 
stores](https://github.com/raminqaf/kafka/blob/dc608850c5e7af1a6f589b30f9f7f0921f64942c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L76-L79)
 (one for the left side and one for the right side). This will remove one of 
the unsafe casts. The other unsafe cast on the right side of the join [was 
removed based on the join's left or right 
value](https://github.com/raminqaf/kafka/blob/dc608850c5e7af1a6f589b30f9f7f0921f64942c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamRightJoin.java#L215-L218).
   
   I changed the PR into a draft because it depends on #15510. Please have a 
look and let me know if this is going in the correct direction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-29 Thread via GitHub


gharris1727 commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2027729205

   This PR looks like it'll have merge conflicts with #15510. Since that is 
fixing a bug it should probably have higher priority than this refactor, can 
you rebase on top of their changes? Alternatively we can wait for that to land 
first before returning to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-29 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1544779363


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##
@@ -41,6 +41,7 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.kafka.streams.state.internals.JoinSide;

Review Comment:
   I just noticed that the JoinSide package is `state`, while this class is 
`kstream`. Since this enum is so tightly bound to this class, it should 
probably be in the same package.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-29 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1544787082


##
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * An enum representing the side of a join operation.
+ * It provides methods to create instances of {@link LeftOrRightValue} based 
on the side specified.
+ */
+@SuppressWarnings("unchecked")

Review Comment:
   It looks like this is just moving the unchecked warning around, rather than 
fixing it.
   ` LeftOrRightValue make(final V leftValue);` is 
impossible to type in a similar way to ` LeftOrRightValue make(final boolean 
isLeftSide, final V value)`, because it will always require casting V to V1 or 
V2.
   
   It might not be possible to do this with Enum, as it can't take type 
arguments.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##
@@ -89,8 +74,8 @@ public V2 getRightValue() {
 @Override
 public String toString() {
 return "<"
-+ ((leftValue != null) ? "left," + leftValue : "right," + 
rightValue)
-+ ">";
++ ((leftValue != null) ? JoinSide.LEFT + "," + leftValue : 
JoinSide.RIGHT + "," + rightValue)

Review Comment:
   This doesn't seem necessary. LeftOrRightValue can be completely unaware of 
the JoinSide enum.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java:
##
@@ -62,13 +62,13 @@ public LeftOrRightValue deserialize(final String 
topic, final byte[] dat
 }
 
 return (data[0] == 1)
-? 
LeftOrRightValue.makeLeftValue(leftDeserializer.deserialize(topic, 
rawValue(data)))

Review Comment:
   Same comment as in LeftOrRightValue, I don't think this is necessary.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * An enum representing the side of a join operation.
+ * It provides methods to create instances of {@link LeftOrRightValue} based 
on the side specified.
+ */
+@SuppressWarnings("unchecked")
+public enum JoinSide {
+LEFT("left") {
+/**
+ * Create a new {@link LeftOrRightValue} instance with the V1 value as 
{@code leftValue} and V2 value as null.
+ *
+ * @param leftValue the left V1 value
+ * @param  the type of the value
+ * @return a new {@link LeftOrRightValue} instance
+ */
+@Override
+public  LeftOrRightValue make(final V leftValue) {
+Objects.requireNonNull(leftValue, "The left join value is null");
+return (LeftOrRightValue) new 
LeftOrRightValue<>(leftValue, null);
+}
+},
+
+RIGHT("right") {
+/**
+ * Create a new {@link LeftOrRightValue} instance with the V2 value as 
{@code rightValue} and V1 value as null.
+ *
+ * @param rightValue the right V2 value
+ * @param  the type of the value
+ * @return a new {@link LeftOrRightValue} instance
+ */
+@Override
+public  LeftOrRightValue make(final V rightValue) {
+

Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-29 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1544749545


##
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * An enum representing the side of a join operation.
+ * It provides methods to create instances of {@link LeftOrRightValue} based 
on the side specified.
+ */
+@SuppressWarnings("unchecked")
+public enum JoinSide {
+LEFT("left") {
+/**
+ * Create a new {@link LeftOrRightValue} instance with the V1 value as 
{@code leftValue} and V2 value as null.
+ *
+ * @param leftValue the left V1 value
+ * @param  the type of the value
+ * @return a new {@link LeftOrRightValue} instance
+ */
+@Override
+public  LeftOrRightValue make(final V leftValue) {
+Objects.requireNonNull(leftValue, "The left join value is null");
+return (LeftOrRightValue) new 
LeftOrRightValue<>(leftValue, null);
+}
+},
+
+RIGHT("right") {
+/**
+ * Create a new {@link LeftOrRightValue} instance with the V2 value as 
{@code rightValue} and V1 value as null.
+ *
+ * @param rightValue the right V2 value
+ * @param  the type of the value
+ * @return a new {@link LeftOrRightValue} instance
+ */
+@Override
+public  LeftOrRightValue make(final V rightValue) {
+Objects.requireNonNull(rightValue, "The left join value is null");
+return (LeftOrRightValue) new LeftOrRightValue<>(null, 
rightValue);
+}
+
+};
+
+private final String joinSideName;
+
+JoinSide(final String joinSideName) {
+this.joinSideName = joinSideName;
+}
+
+public abstract  LeftOrRightValue make(final V value);
+
+/**
+ * Returns true if this JoinSide represents the left side.
+ *
+ * @return true if this JoinSide represents the left side, otherwise false
+ */
+public boolean isLeftSide() {

Review Comment:
   Closed the "hatch" forever!  I had it removed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-29 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1544749545


##
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * An enum representing the side of a join operation.
+ * It provides methods to create instances of {@link LeftOrRightValue} based 
on the side specified.
+ */
+@SuppressWarnings("unchecked")
+public enum JoinSide {
+LEFT("left") {
+/**
+ * Create a new {@link LeftOrRightValue} instance with the V1 value as 
{@code leftValue} and V2 value as null.
+ *
+ * @param leftValue the left V1 value
+ * @param  the type of the value
+ * @return a new {@link LeftOrRightValue} instance
+ */
+@Override
+public  LeftOrRightValue make(final V leftValue) {
+Objects.requireNonNull(leftValue, "The left join value is null");
+return (LeftOrRightValue) new 
LeftOrRightValue<>(leftValue, null);
+}
+},
+
+RIGHT("right") {
+/**
+ * Create a new {@link LeftOrRightValue} instance with the V2 value as 
{@code rightValue} and V1 value as null.
+ *
+ * @param rightValue the right V2 value
+ * @param  the type of the value
+ * @return a new {@link LeftOrRightValue} instance
+ */
+@Override
+public  LeftOrRightValue make(final V rightValue) {
+Objects.requireNonNull(rightValue, "The left join value is null");
+return (LeftOrRightValue) new LeftOrRightValue<>(null, 
rightValue);
+}
+
+};
+
+private final String joinSideName;
+
+JoinSide(final String joinSideName) {
+this.joinSideName = joinSideName;
+}
+
+public abstract  LeftOrRightValue make(final V value);
+
+/**
+ * Returns true if this JoinSide represents the left side.
+ *
+ * @return true if this JoinSide represents the left side, otherwise false
+ */
+public boolean isLeftSide() {

Review Comment:
   I had it removed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-29 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1544749273


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##


Review Comment:
   @gharris1727 Thanks for the fast review! I have addressed your comments. In 
my opinion, the bigger refactoring should be done in a follow-up ticket/issue, 
and this one should specifically solve the casting issue.
   This is also my first contribution here, so I don't want to move a lot of 
parts and have a forever open PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-26 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1540138631


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##
@@ -39,45 +39,6 @@ private LeftOrRightValue(final V1 leftValue, final V2 
rightValue) {
 this.rightValue = rightValue;
 }
 
-/**
- * Create a new {@link LeftOrRightValue} instance with the V1 value as 
{@code leftValue} and
- * V2 value as null.
- *
- * @param leftValue the left V1 value
- * @param   the type of the value
- * @return a new {@link LeftOrRightValue} instance
- */
-public static  LeftOrRightValue makeLeftValue(final V1 
leftValue) {

Review Comment:
   I think you should keep the `makeLeftValue` and `makeRightValue` functions 
as entrypoints to this class. Only the the impossible-to-type `make` function 
should be removed.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -250,7 +252,7 @@ private void emitNonJoinedOuterRecords(
 }
 
 final VOut nullJoinedValue;
-if (isLeftSide) {
+if (joinSide.isLeftSide()) {

Review Comment:
   This is another unsafe place that should be part of the enum.
   Try and eliminate all of the `unchecked` warnings in this class.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -155,7 +156,8 @@ public void process(final Record record) {
 // we may delete some values with the same key early 
but since we are going
 // range over all values of the same key even after 
failure, since the other window-store
 // is only cleaned up by stream time, so this is okay 
for at-least-once.
-
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), 
otherRecordTimestamp), null);
+final JoinSide otherJoinSide = joinSide.isLeftSide() ? 
JoinSide.RIGHT : JoinSide.LEFT;

Review Comment:
   This could be a `JoinSide#complement` or `JoinSide#opposite` function, in 
case there are other situations where we need to know what the other side of 
the join is doing.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##
@@ -173,7 +174,7 @@ public  KStream join(final 
KStream lhs,
 
 final JoinWindowsInternal internalWindows = new 
JoinWindowsInternal(windows);
 final KStreamKStreamJoin joinThis = new 
KStreamKStreamJoin<>(
-true,
+JoinSide.LEFT,

Review Comment:
   I think these call-sites are much more clear with the explicit "left" and 
"right". It's interesting that the variables here are joinThis and joinOther, 
instead of joinLeft and joinRight.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * An enum representing the side of a join operation.
+ * It provides methods to create instances of {@link LeftOrRightValue} based 
on the side specified.
+ */
+@SuppressWarnings("unchecked")
+public enum JoinSide {
+LEFT("left") {
+/**
+ * Create a new {@link LeftOrRightValue} instance with the V1 value as 
{@code leftValue} and V2 value as null.
+ *
+ * @param leftValue the left V1 value
+ * @param  the type of the value
+ * @return a new {@link LeftOrRightValue} instance
+ */
+@Override
+public  LeftOrRightValue make(final V leftValue) {
+Objects.requireNonNull(leftValue, "The left join value is null");
+return (LeftOrRightValue) new 
LeftOrRightValue<>(leftValue, null);
+}
+},
+
+RIGHT("right") {
+/**
+ * Create a new {@link LeftOrRightValue} instance with the V2 value as 
{@code rightValue} and V1 value as null.
+ *
+ * @param rightValue the right V2 value
+ * 

Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-26 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1540136195


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##


Review Comment:
   I assumed that this would be necessary. I think either could work, and it's 
up to the your taste. If this enum is only useful for this one class, that 
could indicate that the the enum and joiner are too coupled. I'm not familiar 
with Streams, but maybe there's another class that could benefit from this 
functionality?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-26 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1539078322


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##


Review Comment:
   Retroperspectivly, we can refactor this class and break it into two 
subclasses (`LeftJoinValue` & `RightJoinValue`) that 
implement the same interface (`JoinValue`). We can use the interface in 
the enum to make the objects.
   I already created these on my machine, but I wanted to avoid making 
significant changes in a single PR and moving too many classes simultaneously. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-26 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1539078322


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##


Review Comment:
   Retroperspectivly, we can refactor this class and break it into two 
subclasses (`LeftJoinValue` & `RightJoinValue`) that 
implement the same interface (`JoinValue`). We can use the interface in 
the enum to make the objects.
   I wanted to avoid significant changes in this PR and move too many classes 
simultaneously. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-03-26 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1539078322


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##


Review Comment:
   Retroperspectivly, we can refactor this class and break it into two 
subclasses (`LeftJoinValue` & `RightJoinValue`) that implement an interface 
(`JoinValue`).
   I wanted to avoid significant changes in this PR and move too many classes 
simultaneously. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org