ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final
MockProcessorSupplier<Windowed<String>
inputTopic.pipeInput("2", "B", 1000L);
inputTopic.pipeInput("3", "C", 600L);
}
- assertThat(supplier.theCapturedProcessor().processed(),
equalTo(Arrays.asList(
- // processing A@500
- new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L,
500L)), 1L, 500L),
- // processing A@999
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(501L, 1001L)), 1L, 999L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(499L, 999L)), 2L, 999L),
- // processing A@600
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(499L, 999L)), 3L, 999L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(501L, 1001L)), 2L, 999L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(100L, 600L)), 2L, 600L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(601L, 1101L)), 1L, 999L),
- // processing B@500
- new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L,
500L)), 1L, 500L),
- // processing B@600
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(501L, 1001L)), 1L, 600L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(100L, 600L)), 2L, 600L),
- // processing B@700
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(501L, 1001L)), 2L, 700L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(601L, 1101L)), 1L, 700L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(200L, 700L)), 3L, 700L),
- // processing C@501
- new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L,
501L)), 1L, 501L),
- // processing first A@1000
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(501L, 1001L)), 3L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(601L, 1101L)), 2L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(1000L, 1500L)), 1L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(500L, 1000L)), 4L, 1000L),
- // processing second A@1000
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(500L, 1000L)), 5L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(501L, 1001L)), 4L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(601L, 1101L)), 3L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(1000L, 1500L)), 2L, 1000L),
- // processing first B@1000
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(501L, 1001L)), 3L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(601L, 1101)), 2L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(701L, 1201L)), 1L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(500L, 1000L)), 4L, 1000L),
- // processing second B@1000
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(500L, 1000L)), 5L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(501L, 1001L)), 4L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(601L, 1101)), 3L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(701L, 1201L)), 2L, 1000L),
- // processing C@600
- new KeyValueTimestamp<>(new Windowed<>("3", new
TimeWindow(502L, 1002L)), 1L, 600L),
- new KeyValueTimestamp<>(new Windowed<>("3", new
TimeWindow(100L, 600L)), 2L, 600L)
+ final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator
=
Review comment:
Sorry, I realize I never replied to your reply. I definitely agree, no
need to force a particular inter-key ordering. The only ordering that would
change is the updates to windows of different start times, which was arbitrary
to begin with. The ordering that does matter -- intra-key ordering, ie updates
with the same key and window start time -- isn't affected. Final results still
come last, which is the important thing
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
public void testAggregateSmallInput() {
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
-
- final KTable<Windowed<String>, String> table = builder
- .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10),
ofMillis(50)))
- .aggregate(
- MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
- );
+ final KTable<Windowed<String>, String> table;
+ if (inOrderIterator) {
+ table = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10),
ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.as(new
InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+ );
+ } else {
+ table = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10),
ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
Review comment:
I knew John would know what's up with the weird type nonsense 😛
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
inputTopic.pipeInput("k1", "v1", 7L);
// final record to advance stream time and flush windows
inputTopic.pipeInput("k1", "v1", 90L);
+ final Comparator<TestRecord<String, Long>> comparator =
Review comment:
Actually, I think the timestamp of the forwarded results is now the
window's "event time", ie the maximum timestamp of a record in the window. But
in retrospect I don't see any correctness issues here: for one thing, as I
mentioned in the other comment, this only affects the relative ordering of
updates with different windowed keys. And there's no ordering guarantees
between keys. It also shouldn't even have any impact on advancing stream-time
and potentially dropping some windows due to grace
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final
MockProcessorSupplier<Windowed<String>
inputTopic.pipeInput("2", "B", 1000L);
inputTopic.pipeInput("3", "C", 600L);
}
- assertThat(supplier.theCapturedProcessor().processed(),
equalTo(Arrays.asList(
- // processing A@500
- new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L,
500L)), 1L, 500L),
- // processing A@999
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(501L, 1001L)), 1L, 999L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(499L, 999L)), 2L, 999L),
- // processing A@600
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(499L, 999L)), 3L, 999L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(501L, 1001L)), 2L, 999L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(100L, 600L)), 2L, 600L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(601L, 1101L)), 1L, 999L),
- // processing B@500
- new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L,
500L)), 1L, 500L),
- // processing B@600
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(501L, 1001L)), 1L, 600L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(100L, 600L)), 2L, 600L),
- // processing B@700
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(501L, 1001L)), 2L, 700L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(601L, 1101L)), 1L, 700L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(200L, 700L)), 3L, 700L),
- // processing C@501
- new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L,
501L)), 1L, 501L),
- // processing first A@1000
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(501L, 1001L)), 3L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(601L, 1101L)), 2L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(1000L, 1500L)), 1L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(500L, 1000L)), 4L, 1000L),
- // processing second A@1000
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(500L, 1000L)), 5L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(501L, 1001L)), 4L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(601L, 1101L)), 3L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("1", new
TimeWindow(1000L, 1500L)), 2L, 1000L),
- // processing first B@1000
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(501L, 1001L)), 3L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(601L, 1101)), 2L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(701L, 1201L)), 1L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(500L, 1000L)), 4L, 1000L),
- // processing second B@1000
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(500L, 1000L)), 5L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(501L, 1001L)), 4L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(601L, 1101)), 3L, 1000L),
- new KeyValueTimestamp<>(new Windowed<>("2", new
TimeWindow(701L, 1201L)), 2L, 1000L),
- // processing C@600
- new KeyValueTimestamp<>(new Windowed<>("3", new
TimeWindow(502L, 1002L)), 1L, 600L),
- new KeyValueTimestamp<>(new Windowed<>("3", new
TimeWindow(100L, 600L)), 2L, 600L)
+ final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator
=
Review comment:
Sorry, I realize I never replied to your reply. I definitely agree, no
need to force a particular inter-key ordering. The only ordering that would
change is the updates to windows of different start times, which was arbitrary
to begin with. The ordering that does matter -- intra-key ordering, ie updates
with the same key and window start time -- isn't affected. Final results still
come last, which is the important thing
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
public void testAggregateSmallInput() {
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
-
- final KTable<Windowed<String>, String> table = builder
- .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10),
ofMillis(50)))
- .aggregate(
- MockInitializer.STRING_INIT,
- MockAggregator.TOSTRING_ADDER,
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
- );
+ final KTable<Windowed<String>, String> table;
+ if (inOrderIterator) {
+ table = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10),
ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.as(new
InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+ );
+ } else {
+ table = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10),
ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
Review comment:
I knew John would know what's up with the weird type nonsense 😛
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
inputTopic.pipeInput("k1", "v1", 7L);
// final record to advance stream time and flush windows
inputTopic.pipeInput("k1", "v1", 90L);
+ final Comparator<TestRecord<String, Long>> comparator =
Review comment:
Actually, I think the timestamp of the forwarded results is now the
window's "event time", ie the maximum timestamp of a record in the window. But
in retrospect I don't see any correctness issues here: for one thing, as I
mentioned in the other comment, this only affects the relative ordering of
updates with different windowed keys. And there's no ordering guarantees
between keys. It also shouldn't even have any impact on advancing stream-time
and potentially dropping some windows due to grace
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]