mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r649031614



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -67,7 +67,7 @@
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
  * @see TimestampExtractor
  */
-public final class JoinWindows extends Windows<Window> {
+public class JoinWindows extends Windows<Window> {

Review comment:
       Need to change this to be able to add an `JoinWindowsInternal` to access 
the newly added flag.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -57,18 +58,22 @@
 
     KStreamKStreamJoin(final boolean isLeftSide,
                        final String otherWindowName,
-                       final long joinBeforeMs,
-                       final long joinAfterMs,
-                       final long joinGraceMs,
+                       final JoinWindowsInternal windows,
                        final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, ? extends R> joiner,
                        final boolean outer,
                        final Optional<String> outerJoinWindowName,
                        final KStreamImplJoin.MaxObservedStreamTime 
maxObservedStreamTime) {
         this.isLeftSide = isLeftSide;
         this.otherWindowName = otherWindowName;
-        this.joinBeforeMs = joinBeforeMs;
-        this.joinAfterMs = joinAfterMs;
-        this.joinGraceMs = joinGraceMs;
+        if (isLeftSide) {

Review comment:
       This was done by the caller before, ie, `KStreamImplJoin` from above. As 
we only pass one parameter now, we need to do the flip here if necessary.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -88,7 +90,22 @@ public void testLeftJoinWithInvalidSpuriousResultFixFlag() {
     }
 
     @Test
-    public void testLeftJoinWithSpuriousResultFixDisabled() {
+    public void testLeftJoinWithSpuriousResultFixDisabledViaFeatureFlag() {
+        runLeftJoinWithoutSpuriousResultFix(
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)),
+            false
+        );
+    }
+    @Test
+    public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {

Review comment:
       I duplicate this test, to verify that the feature flag, as well as the 
old API disables this fix. Thus, the usage of the old API in this method should 
not be changes via KIP-633 PR.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -150,12 +150,11 @@ public long get() {
         // Time shared between joins to keep track of the maximum stream time
         final MaxObservedStreamTime maxObservedStreamTime = new 
MaxObservedStreamTime();
 
+        final JoinWindowsInternal internalWindows = new 
JoinWindowsInternal(windows);
         final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new 
KStreamKStreamJoin<>(
             true,
             otherWindowStore.name(),
-            windows.beforeMs,
-            windows.afterMs,
-            windows.gracePeriodMs(),
+            internalWindows,

Review comment:
       Easier to pass one parameter instead of 4

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -76,18 +76,37 @@
 
     private final long graceMs;
 
+    protected final boolean enableSpuriousResultFix;

Review comment:
       This is the new flag. We set it to `false` if the old methods are used, 
and to `true` for the new methods from KIP-633.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -114,7 +133,7 @@ public static JoinWindows of(final Duration timeDifference) 
throws IllegalArgume
     public JoinWindows before(final Duration timeDifference) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefix);
-        return new JoinWindows(timeDifferenceMs, afterMs, 
DEFAULT_GRACE_PERIOD_MS);
+        return new JoinWindows(timeDifferenceMs, afterMs, graceMs, 
enableSpuriousResultFix);

Review comment:
       Side fix: `before()` resets grace to 24h (not sure why -- seems to be a 
bug)
   
   same for `after()` below.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;

Review comment:
       Side cleanup

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
##########
@@ -36,15 +36,13 @@
  * Too much information to generalize, so Stream-Stream joins are represented 
by a specific node.
  */
 public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K, V1, V2, VR> {
-    private static final Properties EMPTY_PROPERTIES = new Properties();

Review comment:
       unused.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
##########
@@ -84,53 +84,56 @@ public void setup() {
     @Test
     public void shouldIncludeKeyInStreamSteamJoinResults() {
         leftStream.join(
-                rightStream,

Review comment:
       Just some cleanup. The actual change is above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
##########
@@ -102,7 +102,14 @@ public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder, final
         topologyBuilder.addStateStore(thisWindowStoreBuilder, 
thisWindowedStreamProcessorName, otherProcessorName);
         topologyBuilder.addStateStore(otherWindowStoreBuilder, 
otherWindowedStreamProcessorName, thisProcessorName);
 
-        if (props == null || StreamsConfig.InternalConfig.getBoolean(new 
HashMap(props), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
+        if (enableSpuriousResultFix &&

Review comment:
       We evaluate the feature flag twice. This is the second time. So we add 
the new flag, too.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -63,8 +64,6 @@
 import static org.junit.Assert.assertTrue;
 
 public class KStreamKStreamJoinTest {
-    private final static KeyValueTimestamp<?, ?>[] EMPTY = new 
KeyValueTimestamp[0];

Review comment:
       unnecessary

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
         private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
 
-            if 
(StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), 
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-                outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+            if (enableSpuriousResultFix

Review comment:
       We evaluate the feature flag twice. This is the first time. So we add 
the new flag, too.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -76,18 +76,37 @@
 
     private final long graceMs;
 
+    protected final boolean enableSpuriousResultFix;
+
+    protected JoinWindows(final JoinWindows joinWindows) {
+        beforeMs = joinWindows.beforeMs;
+        afterMs = joinWindows.afterMs;
+        graceMs = joinWindows.graceMs;
+        enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+    }
+
     private JoinWindows(final long beforeMs,
                         final long afterMs,
-                        final long graceMs) {
+                        final long graceMs,
+                        final boolean enableSpuriousResultFix) {
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
+        this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
-    /**
+    public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {

Review comment:
       Kept the overlap with KIP-633 PR to a minimum. The key is, that this and 
the method below must set the new flag to `true`.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
##########
@@ -138,9 +143,12 @@ public void testInnerRepartitioned() {
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
-                .join(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
-                                 
.selectKey(MockMapper.selectKeyKeyValueMapper()),
-                       valueJoiner, 
JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
+            .join(
+                rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
+                    .selectKey(MockMapper.selectKeyKeyValueMapper()),
+                valueJoiner,
+                JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), 
ofHours(24))

Review comment:
       Need to switch to the new API to keep the fix enabled. Otherwise the 
test would break.
   
   Just set grace to 24h that was the old default to ensure nothing breaks. -- 
Similar in other tests below.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -396,7 +416,7 @@ public void testJoin() {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
             }
-            processor.checkAndClearProcessResult(EMPTY);
+            processor.checkAndClearProcessResult();

Review comment:
       We can pass nothing to do a check for "no result"

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -203,68 +202,79 @@ public void 
shouldEnableLoggingWithCustomConfigOnStreamJoined() {
     @Test
     public void 
shouldThrowExceptionThisStoreSupplierRetentionDoNotMatchWindowsSizeAndGrace() {
         // Case where retention of thisJoinStore doesn't match JoinWindows
-        final WindowBytesStoreSupplier thisStoreSupplier = 
buildWindowBytesStoreSupplier("in-memory-join-store", 500, 100, true);

Review comment:
       Did some side cleanup... Will highlight actual changes.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -852,25 +942,26 @@ private void testUpperWindowBound(final int[] 
expectedKeys,
         // push a dummy record to produce all left-join non-joined items
         time += 301L;
         inputTopic1.pipeInput(0, "dummy", time);
-
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"C0+null", 1101),
-            new KeyValueTimestamp<>(0, "D0+null", 1102),
-            new KeyValueTimestamp<>(1, "D1+null", 1102),
-            new KeyValueTimestamp<>(0, "E0+null", 1103),
-            new KeyValueTimestamp<>(1, "E1+null", 1103),
-            new KeyValueTimestamp<>(2, "E2+null", 1103),
-            new KeyValueTimestamp<>(0, "F0+null", 1104),
-            new KeyValueTimestamp<>(1, "F1+null", 1104),
-            new KeyValueTimestamp<>(2, "F2+null", 1104),
-            new KeyValueTimestamp<>(3, "F3+null", 1104));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "C0+null", 1101L),
+            new KeyValueTimestamp<>(0, "D0+null", 1102L),
+            new KeyValueTimestamp<>(1, "D1+null", 1102L),
+            new KeyValueTimestamp<>(0, "E0+null", 1103L),
+            new KeyValueTimestamp<>(1, "E1+null", 1103L),
+            new KeyValueTimestamp<>(2, "E2+null", 1103L),
+            new KeyValueTimestamp<>(0, "F0+null", 1104L),
+            new KeyValueTimestamp<>(1, "F1+null", 1104L),
+            new KeyValueTimestamp<>(2, "F2+null", 1104L),
+            new KeyValueTimestamp<>(3, "F3+null", 1104L)
+        );
     }
 
     private void testLowerWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
                                       final MockProcessor<Integer, String> 
processor) {
         long time;
         final TestInputTopic<Integer, String> inputTopic1 = 
driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer());
-        final TestInputTopic<Integer, String> inputTopic2 = 
driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer());

Review comment:
       unused

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -483,8 +513,9 @@ public void testOuterJoin() {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()));
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)),

Review comment:
       Switch to new API. (Not all tests in this class need the switch for this 
fix.Leave it for KIP-633 PR to update the others when the old API get's 
deprecated.)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -74,8 +75,9 @@ public void testLeftJoinWithInvalidSpuriousResultFixFlag() {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()));
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),

Review comment:
       Switch to new API (similar below for all other tests in this class)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##########
@@ -72,8 +72,9 @@ public void testOuterJoinWithInvalidSpuriousResultFixFlag() {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()));
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),

Review comment:
       Same as left-join-test above.
   
   Switch to new API for all cases, and duplicate the "disable fix" test using 
feature flag and old API.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -138,13 +158,30 @@ public void testLeftJoinWithSpuriousResultFixDisabled() {
             for (int i = 0; i < 2; i++) {
                 inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]);
             }
-            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"A0+a0", 0),
-                new KeyValueTimestamp<>(1, "A1+a1", 0));
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+a0", 0L),
+                new KeyValueTimestamp<>(1, "A1+a1", 0L)
+            );
         }
     }
 
     @Test
-    public void testLeftJoinDuplicatesWithFixDisabled() {
+    public void 
testLeftJoinDuplicatesWithSpuriousResultFixDisabledFeatureFlag() {
+        testLeftJoinDuplicatesSpuriousResultFix(
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(10L)),
+            false
+        );
+    }
+    @Test
+    public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
+        testLeftJoinDuplicatesSpuriousResultFix(
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),

Review comment:
       As above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to