ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator 
=

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no 
need to force a particular inter-key ordering. The only ordering that would 
change is the updates to windows of different start times, which was arbitrary 
to begin with. The ordering that does matter -- intra-key ordering, ie updates 
with the same key and window start time -- isn't affected. Final results still 
come last, which is the important thing

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new 
InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the 
window's "event time", ie the maximum timestamp of a record in the window. But 
in retrospect I don't see any correctness issues here: for one thing, as I 
mentioned in the other comment, this only affects the relative ordering of 
updates with different windowed keys. And there's no ordering guarantees 
between keys. It also shouldn't even have any impact on advancing stream-time 
and potentially dropping some windows due to grace

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator 
=

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no 
need to force a particular inter-key ordering. The only ordering that would 
change is the updates to windows of different start times, which was arbitrary 
to begin with. The ordering that does matter -- intra-key ordering, ie updates 
with the same key and window start time -- isn't affected. Final results still 
come last, which is the important thing

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new 
InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the 
window's "event time", ie the maximum timestamp of a record in the window. But 
in retrospect I don't see any correctness issues here: for one thing, as I 
mentioned in the other comment, this only affects the relative ordering of 
updates with different windowed keys. And there's no ordering guarantees 
between keys. It also shouldn't even have any impact on advancing stream-time 
and potentially dropping some windows due to grace

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator 
=

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no 
need to force a particular inter-key ordering. The only ordering that would 
change is the updates to windows of different start times, which was arbitrary 
to begin with. The ordering that does matter -- intra-key ordering, ie updates 
with the same key and window start time -- isn't affected. Final results still 
come last, which is the important thing

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new 
InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the 
window's "event time", ie the maximum timestamp of a record in the window. But 
in retrospect I don't see any correctness issues here: for one thing, as I 
mentioned in the other comment, this only affects the relative ordering of 
updates with different windowed keys. And there's no ordering guarantees 
between keys. It also shouldn't even have any impact on advancing stream-time 
and potentially dropping some windows due to grace

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator 
=

Review comment:
       Sorry, I realize I never replied to your reply. I definitely agree, no 
need to force a particular inter-key ordering. The only ordering that would 
change is the updates to windows of different start times, which was arbitrary 
to begin with. The ordering that does matter -- intra-key ordering, ie updates 
with the same key and window start time -- isn't affected. Final results still 
come last, which is the important thing

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new 
InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       I knew John would know what's up with the weird type nonsense 😛 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Actually, I think the timestamp of the forwarded results is now the 
window's "event time", ie the maximum timestamp of a record in the window. But 
in retrospect I don't see any correctness issues here: for one thing, as I 
mentioned in the other comment, this only affects the relative ordering of 
updates with different windowed keys. And there's no ordering guarantees 
between keys. It also shouldn't even have any impact on advancing stream-time 
and potentially dropping some windows due to grace




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to