mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1515212230


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##########
@@ -25,36 +25,46 @@
 
 public final class StreamStreamJoinUtil {
 
-    private StreamStreamJoinUtil(){
+    private StreamStreamJoinUtil() {
     }
 
     public static <KIn, VIn, KOut, VOut> boolean skipRecord(
         final Record<KIn, VIn> record, final Logger logger,
         final Sensor droppedRecordsSensor,
-        final ProcessorContext<KOut, VOut> context) {
+        final ProcessorContext<KOut, VOut> context
+    ) {
         // we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
         //
         // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
         if (record.key() == null || record.value() == null) {
-            if (context.recordMetadata().isPresent()) {
-                final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-                logger.warn(
-                    "Skipping record due to null key or value. "
-                        + "topic=[{}] partition=[{}] offset=[{}]",
-                    recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-                );
-            } else {
-                logger.warn(
-                    "Skipping record due to null key or value. Topic, 
partition, and offset not known."
-                );
-            }
-            droppedRecordsSensor.record();
+            logSkip("null key or value", logger, droppedRecordsSensor, 
context);
             return true;
         } else {
             return false;
         }
     }
+
+    public static <KOut, VOut> void logSkip(
+        final String reason,
+        final Logger logger,
+        final Sensor droppedRecordsSensor,
+        final ProcessorContext<KOut, VOut> context
+    ) {
+        if (context.recordMetadata().isPresent()) {
+            final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+            logger.warn(
+                "Skipping record. reason=[{}] topic=[{}] partition=[{}] 
offset=[{}]",

Review Comment:
   Nit: Just comparing to 
`AbstractKStreamTimeWindowAggregateProcessor#logSkippedRecordForExpiredWindow` 
is seems we could addd more information -- should we try to merge both "skip 
reason" as your PR proposes, if have two different output, one key null-case, 
and a different one for "expired" case similar to windowed-aggregation?



##########
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##########
@@ -71,11 +71,21 @@ void afterEach() {
     @Test
     void testRelaxedLeftStreamStreamJoin() {
         leftStream
-            .leftJoin(rightStream, JOINER, WINDOW)
+            .leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
             .to(OUT);
         initTopology();
-        left.pipeInput(null, "leftValue", 1);
-        assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+        left.pipeInput(null, "leftValue1", 1);
+        left.pipeInput(null, "leftValue2", 90);
+        left.pipeInput(null, "lateArrival-Dropped", 19);
+        left.pipeInput(null, "lateArrivalWithinGrace", 20);
+        assertEquals(
+            Arrays.asList(
+                new KeyValue<>(null, "leftValue1|null"),
+                new KeyValue<>(null, "leftValue2|null"),
+                new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   Why is this record already in the output? We should not drop it, but it 
seems we cannot emit it right away either, because we need to wait until the 
window closes, so would need to pipe one more record with ts=91 to flush out 
this result?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -1901,6 +1903,66 @@ public void testAsymmetricWindowingBefore() {
         }
     }
 
+    @Test
+    public void recordsArrivingPostWindowCloseShouldBeDropped() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> joined = builder.stream(topic1, 
consumed).join(
+            builder.stream(topic2, consumed),
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        joined.process(supplier);
+
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> left =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> right =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            left.pipeInput(0, "left", 15);
+            right.pipeInput(-1, "bumpTime", 40);
+            assertRecordDropCount(0.0, processor);
+
+            right.pipeInput(0, "closesAt39", 24);

Review Comment:
   This should be `closeAt40`? The window is from `[24-10; 24+10] = [14;34]` 
(note, both bounds inclusive), so with zero grace, we would close at 35, but 
with 5 grace, we close at 40.



##########
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##########
@@ -91,13 +101,22 @@ void testRelaxedLeftStreamTableJoin() {
     @Test
     void testRelaxedOuterStreamStreamJoin() {
         leftStream
-            .outerJoin(rightStream, JOINER, WINDOW)
+            .outerJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
             .to(OUT);
         initTopology();
         right.pipeInput(null, "rightValue", 1);
-        left.pipeInput(null, "leftValue");
+        left.pipeInput(null, "leftValue", 90);
+        left.pipeInput(null, "lateArrival-dropped", 19);
+        right.pipeInput(null, "lateArrival-dropped", 19);
+        right.pipeInput(null, "lateArrivalWithinGrace", 20);
+        left.pipeInput(null, "lateArrivalWithinGrace", 20);
         assertEquals(
-            Arrays.asList(new KeyValue<>(null, "null|rightValue"), new 
KeyValue<>(null, "leftValue|null")),
+            Arrays.asList(
+                new KeyValue<>(null, "null|rightValue"),
+                new KeyValue<>(null, "leftValue|null"),
+                new KeyValue<>(null, "null|lateArrivalWithinGrace"),
+                new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   As above for last two records -- we should not yet seem them, without 
advancing time to `91`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to