This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push: new 87716ee923 fix: make sliding window works without grace period (#kafka-13739) (#11980) 87716ee923 is described below commit 87716ee923d3be1f5f6c1f5647bbad7b2dbbea0f Author: Bounkong Khamphousone <bounk...@gmail.com> AuthorDate: Wed Apr 6 15:43:44 2022 +0200 fix: make sliding window works without grace period (#kafka-13739) (#11980) backport of kafka-13739 --- .../internals/KStreamSlidingWindowAggregate.java | 2 +- .../KStreamSlidingWindowAggregateTest.java | 304 +++++++++++++++++++++ 2 files changed, 305 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java index ac4710e4c9..302cd064c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java @@ -473,7 +473,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg final long inputRecordTimestamp) { final long windowStart = window.start(); final long windowEnd = window.end(); - if (windowEnd > closeTime) { + if (windowEnd >= closeTime) { //get aggregate from existing window final VAgg oldAgg = getValueOrNull(valueAndTime); final VAgg newAgg = aggregator.apply(key, value, oldAgg); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java index 38c3fa7eeb..9eba607a02 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java @@ -703,6 +703,310 @@ public class KStreamSlidingWindowAggregateTest { ); } + @Test + public void testEarlyNoGracePeriodSmallInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + + final KTable<Windowed<String>, String> table2 = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) + ); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); + + // all events are considered as early events since record timestamp is less than time difference of the window + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + + inputTopic.pipeInput("A", "1", 0L); + inputTopic.pipeInput("A", "2", 5L); + inputTopic.pipeInput("A", "3", 6L); + inputTopic.pipeInput("A", "4", 3L); + inputTopic.pipeInput("A", "5", 13L); + inputTopic.pipeInput("A", "6", 10L); + } + + final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>(); + for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) { + final Windowed<String> window = entry.key(); + final Long start = window.window().start(); + final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp()); + if (actual.putIfAbsent(start, valueAndTimestamp) != null) { + actual.replace(start, valueAndTimestamp); + } + } + + final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>(); + expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L)); + expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L)); + expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L)); + expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L)); + expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L)); + expected.put(11L, ValueAndTimestamp.make("0+5", 13L)); + + assertEquals(expected, actual); + } + + @Test + public void testNoGracePeriodSmallInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + + final KTable<Windowed<String>, String> table2 = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic-Canonized").withValueSerde(Serdes.String()) + ); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + + inputTopic.pipeInput("A", "1", 100L); + inputTopic.pipeInput("A", "2", 105L); + inputTopic.pipeInput("A", "3", 106L); + inputTopic.pipeInput("A", "4", 103L); + inputTopic.pipeInput("A", "5", 113L); + inputTopic.pipeInput("A", "6", 110L); + } + + final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>(); + for (final KeyValueTimestamp<Windowed<String>, String> entry : supplier.theCapturedProcessor().processed()) { + final Windowed<String> window = entry.key(); + final Long start = window.window().start(); + final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make(entry.value(), entry.timestamp()); + if (actual.putIfAbsent(start, valueAndTimestamp) != null) { + actual.replace(start, valueAndTimestamp); + } + } + + final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>(); + expected.put(50L, ValueAndTimestamp.make("0+1", 100L)); + expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L)); + expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L)); + expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L)); + expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L)); + expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L)); + expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L)); + expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L)); + expected.put(111L, ValueAndTimestamp.make("0+5", 113L)); + + assertEquals(expected, actual); + } + + @Test + public void testEarlyNoGracePeriodLargeInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + final WindowBytesStoreSupplier storeSupplier = + inOrderIterator + ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false) + : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500), Duration.ofMillis(10), false); + + final KTable<Windowed<String>, String> table2 = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.as(storeSupplier) + ); + + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic1 = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + + inputTopic1.pipeInput("E", "1", 0L); + inputTopic1.pipeInput("E", "3", 5L); + inputTopic1.pipeInput("E", "4", 6L); + inputTopic1.pipeInput("E", "2", 3L); + inputTopic1.pipeInput("E", "6", 13L); + inputTopic1.pipeInput("E", "5", 10L); + inputTopic1.pipeInput("E", "7", 4L); + inputTopic1.pipeInput("E", "8", 2L); + inputTopic1.pipeInput("E", "9", 15L); + } + final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator = + Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key()) + .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start()); + + final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed(); + actual.sort(comparator); + assertEquals( + asList( + // E@0 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1", 0), + // E@5 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3", 5), + // E@6 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4", 6), + // E@3 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 10)), "0+1+3+4+2", 6), + // E@5 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3", 5), + // E@6 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4", 6), + // E@3 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 11)), "0+3+4+2", 6), + //E@13 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6", 13), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5", 13), + //E@4 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 13)), "0+3+4+2+6+5+7", 13), + // E@3 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4", 6), + //E@13 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6", 13), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5", 13), + //E@4 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 14)), "0+3+4+6+5+7", 13), + //E@4 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5", 13), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 15)), "0+3+4+6+5+9", 15), + // E@6 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4", 6), + //E@13 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6", 13), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5", 13), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 16)), "0+4+6+5+9", 15), + //E@13 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6", 13), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5", 13), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 17)), "0+6+5+9", 15), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6", 13), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 21)), "0+6+9", 15), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 24)), "0+9", 15)), + actual + ); + } + + @Test + public void testNoGracePeriodLargeInput() { + final StreamsBuilder builder = new StreamsBuilder(); + final String topic = "topic"; + final WindowBytesStoreSupplier storeSupplier = + inOrderIterator + ? new InOrderMemoryWindowStoreSupplier("InOrder", 500L, 10L, false) + : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(500), Duration.ofMillis(10), false); + + final KTable<Windowed<String>, String> table2 = builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10))) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.as(storeSupplier) + ); + + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<String, String> inputTopic1 = + driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); + + inputTopic1.pipeInput("E", "1", 100L); + inputTopic1.pipeInput("E", "3", 105L); + inputTopic1.pipeInput("E", "4", 106L); + inputTopic1.pipeInput("E", "2", 103L); + inputTopic1.pipeInput("E", "6", 113L); + inputTopic1.pipeInput("E", "5", 110L); + inputTopic1.pipeInput("E", "7", 104L); + inputTopic1.pipeInput("E", "8", 102L); + inputTopic1.pipeInput("E", "9", 115L); + } + final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator = + Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key()) + .thenComparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().window().start()); + + final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = supplier.theCapturedProcessor().processed(); + actual.sort(comparator); + assertEquals( + asList( + // E@0 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(90, 100)), "0+1", 100), + // E@5 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(95, 105)), "0+1+3", 105), + // E@6 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 106)), "0+1+3+4", 106), + // E@3 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 106)), "0+1+3+4+2", 106), + // E@5 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3", 105), + // E@6 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3+4", 106), + // E@3 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(101, 111)), "0+3+4+2", 106), + //E@13 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6", 113), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6+5", 113), + //E@4 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(103, 113)), "0+3+4+2+6+5+7", 113), + // E@3 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4", 106), + //E@13 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6", 113), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6+5", 113), + //E@4 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(104, 114)), "0+3+4+6+5+7", 113), + //E@4 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(105, 115)), "0+3+4+6+5", 113), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(105, 115)), "0+3+4+6+5+9", 115), + // E@6 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4", 106), + //E@13 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6", 113), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6+5", 113), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(106, 116)), "0+4+6+5+9", 115), + //E@13 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6", 113), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6+5", 113), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(107, 117)), "0+6+5+9", 115), + //E@10 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(111, 121)), "0+6", 113), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(111, 121)), "0+6+9", 115), + //E@15 + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(114, 124)), "0+9", 115)), + actual + ); + } + @Test public void shouldLogAndMeterWhenSkippingNullKey() { final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;