mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1511998434


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##########
@@ -438,13 +438,13 @@ public void testOrdering() {
             inputTopic1.pipeInput(1, "A1", 100L);
             processor.checkAndClearProcessResult();
 
-            // push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-            // the joined records
-            // by the time they were produced before
+            // push one item to the other window that has a join;
+            // this should produce the joined record first;
+            // then the not-joined record

Review Comment:
   This change to the comment needs to be rolled back, right? We indeed produce 
the left-null join result first.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##########
@@ -884,11 +886,13 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
 
             processor.checkAndClearProcessResult();
 
-            // push one item to the first stream; this should produce one 
full-join item
+            // push one item to the first stream;
+            // this should produce one inner-join item;
+            // and a right-joined item for a3

Review Comment:
   We don't produce output for `a3` 



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##########
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
         }
     }
 
+    @Test
+    public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        
+        joined = stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+            StreamJoined.with(Serdes.Integer(),
+                Serdes.String(),
+                Serdes.String())
+        );
+        joined.process(supplier);
+
+        final Collection<Set<String>> copartitionGroups =
+            
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                    driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            processor.init(null);
+            // push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }

Review Comment:
   Why are we using `B` not `A` for left input?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##########
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
         }
     }
 
+    @Test
+    public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {

Review Comment:
   Why do we need this test case? Seems it's fully contained in 
`testLeftJoinedRecordsWithZeroAfterAreEmitted` below?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##########
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
         }
     }
 
+    @Test
+    public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        
+        joined = stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+            StreamJoined.with(Serdes.Integer(),
+                Serdes.String(),
+                Serdes.String())
+        );
+        joined.process(supplier);
+
+        final Collection<Set<String>> copartitionGroups =
+            
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                    driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            processor.init(null);
+            // push four items with increasing timestamps to the primary 
stream; this should emit null-joined items

Review Comment:
   `his should emit [three] null-joined items; the last one does not emit yet`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to