mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1491795591


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -191,6 +191,10 @@ public void process(final Record<K, V1> record) {
             }
         }
 
+        private boolean isActiveWindow(final long timeFrom, final long timeTo) 
{
+            return sharedTimeTracker.streamTime >= timeFrom && timeTo + 
joinGraceMs >= sharedTimeTracker.streamTime;

Review Comment:
   Do we need to check `sharedTimeTracker.streamTime >= timeFrom` ?
   
   If I am not mistaken, `timeTo` is always larger that `timeFrom`, and 
`graceMs` is always non-negative. Thus, if `timeTo + joinGraceMs >= 
sharedTimeTracker.streamTime;` it implies that `sharedTimeTracker.streamTime >= 
timeFrom` is `true`? And if `timeTo + joinGraceMs >= 
sharedTimeTracker.streamTime;` is false, the whole condition is false 
independent of `sharedTimeTracker.streamTime >= timeFrom`?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -128,7 +128,7 @@ public void process(final Record<K, V1> record) {
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
             sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
-
+            if (!isActiveWindow(timeFrom, timeTo)) return;

Review Comment:
   nit: code style -- we never use blocks without `{ }` -- this should be
   ```
       if (!isActiveWindow(timeFrom, timeTo)) {
           return;
       }
   ```
   
   Also, I think we should record this and call 
`StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())` 
similar to key and/or valud being `null`.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -1363,6 +1365,16 @@ public void testWindowing() {
                 new KeyValueTimestamp<>(2, "L2+l2", 2002L),
                 new KeyValueTimestamp<>(3, "L3+l3", 2003L)
             );
+
+            //push two items with timestamp at grace edge; this should produce 
one join item, M0 is 'too late'
+            final long currentStreamTime = 2104;
+            final long lowerBound = currentStreamTime - 
timeDifference.toMillis() - grace.toMillis();
+            inputTopic1.pipeInput(0, "M0", lowerBound - 1);
+            inputTopic1.pipeInput(1, "M1", lowerBound + 1);

Review Comment:
   Why `+ 1` -- `lowerBound` by itself should already be inclusive and produce 
a join result?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -1363,6 +1365,16 @@ public void testWindowing() {
                 new KeyValueTimestamp<>(2, "L2+l2", 2002L),
                 new KeyValueTimestamp<>(3, "L3+l3", 2003L)
             );
+
+            //push two items with timestamp at grace edge; this should produce 
one join item, M0 is 'too late'

Review Comment:
   We should add the "drop record sensor check == 0" before we intentionally 
push "late" data that should be dropped, and we should also test the sensor at 
the very end that is recorded the dropped records.



##########
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##########
@@ -71,11 +71,21 @@ void afterEach() {
     @Test
     void testRelaxedLeftStreamStreamJoin() {
         leftStream
-            .leftJoin(rightStream, JOINER, WINDOW)
+            .leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
             .to(OUT);
         initTopology();
-        left.pipeInput(null, "leftValue", 1);
-        assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+        left.pipeInput(null, "leftValue1", 1);
+        left.pipeInput(null, "leftValue2", 90);
+        left.pipeInput(null, "lateArrival-Dropped", 19);
+        left.pipeInput(null, "lateArrivalWithinGrace", 20);
+        assertEquals(
+            Arrays.asList(
+                new KeyValue<>(null, "leftValue1|null"),
+                new KeyValue<>(null, "leftValue2|null"),
+                new KeyValue<>(null, "lateArrivalWithinGrace|null")
+            ),
+            out.readKeyValuesToList()
+        );
     }
 
     @Test

Review Comment:
   Not related to this PR, but the new stream-table join with versioned state 
stores also has a grace-period -- can we double check the KIP if we decided to 
drop stream-records for left-join when they arrive after grace-period and maybe 
extend this test below (ie `testRelaxedLeftStreamTableJoin`) accordingly (in a 
follow up PR)?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -1651,7 +1663,7 @@ public void testAsymmetricWindowingBefore() {
         joined = stream1.join(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(0)).before(ofMillis(100)),
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(0), 
ofMillis(4)).before(ofMillis(100)),

Review Comment:
   as above



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -816,10 +816,12 @@ public void testWindowing() {
         stream1 = builder.stream(topic1, consumed);
         stream2 = builder.stream(topic2, consumed);
 
+        final Duration timeDifference = ofMillis(100L);
+        final Duration grace = ofMillis(104);

Review Comment:
   I think we need a larger grace period -- this test aims to test window 
bounds, and no input data should be dropped as late. -- We should set it to 
`150`? -- Can we also add a check on the dropped record sensor that it does 
report zero? (Also cf my other commend below)



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -1382,7 +1394,7 @@ public void testAsymmetricWindowingAfter() {
         joined = stream1.join(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(0)).after(ofMillis(100)),
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(0), 
ofMillis(4)).after(ofMillis(100)),

Review Comment:
   As above: this test is an inner-join and setup to never drop record due to 
grace -- we should set a larger value and also check "dropped record sensor".
   
   It would also make sense to add the same extension to send additional "late" 
data at the end of this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to