florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1518667829


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -1901,6 +1903,66 @@ public void testAsymmetricWindowingBefore() {
         }
     }
 
+    @Test
+    public void recordsArrivingPostWindowCloseShouldBeDropped() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> joined = builder.stream(topic1, 
consumed).join(
+            builder.stream(topic2, consumed),
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        joined.process(supplier);
+
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> left =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> right =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            left.pipeInput(0, "left", 15);
+            right.pipeInput(-1, "bumpTime", 40);
+            assertRecordDropCount(0.0, processor);
+
+            right.pipeInput(0, "closesAt39", 24);

Review Comment:
   Thanks, ok, I adjuted the 'hint' in the value accordingly.
   I don't think we have off-by-one issue here: `[14;34 + 5]` so the record is 
considered 'too late' at t=40?
   In other words for this test case it was purely a misleading 'hint'?
   
   On a different note, I deleted the test case in `KStreamKStreamJoinTest` and 
refer to `KStreamKStreamWindowCloseTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to