ableegoldman commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661
########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ########## @@ -239,52 +241,81 @@ private void doCountSlidingWindows(final MockProcessorSupplier<Windowed<String> inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("3", "C", 600L); } - assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList( - // processing A@500 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L), - // processing A@999 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L), - // processing A@600 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L), - // processing B@500 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L), - // processing B@600 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L), - // processing B@700 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L), - // processing C@501 - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L), - // processing first A@1000 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L), - // processing second A@1000 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), - // processing first B@1000 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L), - // processing second B@1000 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L), - // processing C@600 - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L), - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L) + final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator = Review comment: Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ########## @@ -78,16 +100,28 @@ public void testAggregateSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; - - final KTable<Windowed<String>, String> table = builder - .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) - .aggregate( - MockInitializer.STRING_INIT, - MockAggregator.TOSTRING_ADDER, - Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) - ); + final KTable<Windowed<String>, String> table; + if (inOrderIterator) { + table = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)) + ); + } else { + table = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) Review comment: I knew John would know what's up with the weird type nonsense 😛 ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java ########## @@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() { inputTopic.pipeInput("k1", "v1", 7L); // final record to advance stream time and flush windows inputTopic.pipeInput("k1", "v1", 90L); + final Comparator<TestRecord<String, Long>> comparator = Review comment: Actually, I think the timestamp of the forwarded results is now the window's "event time", ie the maximum timestamp of a record in the window. But in retrospect I don't see any correctness issues here: for one thing, as I mentioned in the other comment, this only affects the relative ordering of updates with different windowed keys. And there's no ordering guarantees between keys. It also shouldn't even have any impact on advancing stream-time and potentially dropping some windows due to grace ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ########## @@ -239,52 +241,81 @@ private void doCountSlidingWindows(final MockProcessorSupplier<Windowed<String> inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("3", "C", 600L); } - assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList( - // processing A@500 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L), - // processing A@999 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L), - // processing A@600 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L), - // processing B@500 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L), - // processing B@600 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L), - // processing B@700 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L), - // processing C@501 - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L), - // processing first A@1000 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L), - // processing second A@1000 - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), - // processing first B@1000 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L), - // processing second B@1000 - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L), - // processing C@600 - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L), - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L) + final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator = Review comment: Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ########## @@ -78,16 +100,28 @@ public void testAggregateSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; - - final KTable<Windowed<String>, String> table = builder - .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) - .aggregate( - MockInitializer.STRING_INIT, - MockAggregator.TOSTRING_ADDER, - Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) - ); + final KTable<Windowed<String>, String> table; + if (inOrderIterator) { + table = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.as(new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false)) + ); + } else { + table = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) Review comment: I knew John would know what's up with the weird type nonsense 😛 ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java ########## @@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() { inputTopic.pipeInput("k1", "v1", 7L); // final record to advance stream time and flush windows inputTopic.pipeInput("k1", "v1", 90L); + final Comparator<TestRecord<String, Long>> comparator = Review comment: Actually, I think the timestamp of the forwarded results is now the window's "event time", ie the maximum timestamp of a record in the window. But in retrospect I don't see any correctness issues here: for one thing, as I mentioned in the other comment, this only affects the relative ordering of updates with different windowed keys. And there's no ordering guarantees between keys. It also shouldn't even have any impact on advancing stream-time and potentially dropping some windows due to grace ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org