lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r485623786



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator 
=

Review comment:
       Because we're outputting all the intermediate data, the order of the 
internal windows aren't the same when you use the reverse iterator versus the 
in-order iterator. This is mainly an issue for large tests, but when there are 
a lot of windows to update as a new record comes in, the reverse iterator looks 
at them from largest start time to smallest start time, and the in-order 
iterator looks at them from smallest start time to largest start time. I 
suppose we could alter the algorithm to output results differently, but getting 
a different order didn't seem too harmful for users. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to