This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 87716ee923 fix: make sliding window works without grace period 
(#kafka-13739) (#11980)
87716ee923 is described below

commit 87716ee923d3be1f5f6c1f5647bbad7b2dbbea0f
Author: Bounkong Khamphousone <bounk...@gmail.com>
AuthorDate: Wed Apr 6 15:43:44 2022 +0200

    fix: make sliding window works without grace period (#kafka-13739) (#11980)
    
    backport of kafka-13739
---
 .../internals/KStreamSlidingWindowAggregate.java   |   2 +-
 .../KStreamSlidingWindowAggregateTest.java         | 304 +++++++++++++++++++++
 2 files changed, 305 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index ac4710e4c9..302cd064c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -473,7 +473,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
                                             final long inputRecordTimestamp) {
             final long windowStart = window.start();
             final long windowEnd = window.end();
-            if (windowEnd > closeTime) {
+            if (windowEnd >= closeTime) {
                 //get aggregate from existing window
                 final VAgg oldAgg = getValueOrNull(valueAndTime);
                 final VAgg newAgg = aggregator.apply(key, value, oldAgg);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 38c3fa7eeb..9eba607a02 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -703,6 +703,310 @@ public class KStreamSlidingWindowAggregateTest {
         );
     }
 
+    @Test
+    public void testEarlyNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+            );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        // all events are considered as early events since record timestamp is 
less than time difference of the window
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 0L);
+            inputTopic.pipeInput("A", "2", 5L);
+            inputTopic.pipeInput("A", "3", 6L);
+            inputTopic.pipeInput("A", "4", 3L);
+            inputTopic.pipeInput("A", "5", 13L);
+            inputTopic.pipeInput("A", "6", 10L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : 
supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
+        expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
+        expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
+        expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
+        expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
+        expected.put(11L, ValueAndTimestamp.make("0+5", 13L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testNoGracePeriodSmallInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+                );
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic.pipeInput("A", "1", 100L);
+            inputTopic.pipeInput("A", "2", 105L);
+            inputTopic.pipeInput("A", "3", 106L);
+            inputTopic.pipeInput("A", "4", 103L);
+            inputTopic.pipeInput("A", "5", 113L);
+            inputTopic.pipeInput("A", "6", 110L);
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+        for (final KeyValueTimestamp<Windowed<String>, String> entry : 
supplier.theCapturedProcessor().processed()) {
+            final Windowed<String> window = entry.key();
+            final Long start = window.window().start();
+            final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+            if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+                actual.replace(start, valueAndTimestamp);
+            }
+        }
+
+        final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put(50L, ValueAndTimestamp.make("0+1", 100L));
+        expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
+        expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
+        expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
+        expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
+        expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
+        expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
+        expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
+        expected.put(111L, ValueAndTimestamp.make("0+5", 113L));
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testEarlyNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 
500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", 
Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 0L);
+            inputTopic1.pipeInput("E", "3", 5L);
+            inputTopic1.pipeInput("E", "4", 6L);
+            inputTopic1.pipeInput("E", "2", 3L);
+            inputTopic1.pipeInput("E", "6", 13L);
+            inputTopic1.pipeInput("E", "5", 10L);
+            inputTopic1.pipeInput("E", "7", 4L);
+            inputTopic1.pipeInput("E", "8", 2L);
+            inputTopic1.pipeInput("E", "9", 15L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = 
supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1", 0),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(0, 
10)), "0+1+3+4+2", 6),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 
11)), "0+3", 5),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 
11)), "0+3+4", 6),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(1, 
11)), "0+3+4+2", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 
13)), "0+3+4+2+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 
13)), "0+3+4+2+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(3, 
13)), "0+3+4+2+6+5+7", 13),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4+6+5", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(4, 
14)), "0+3+4+6+5+7", 13),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 
15)), "0+3+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(5, 
15)), "0+3+4+6+5+9", 15),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4", 6),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(6, 
16)), "0+4+6+5+9", 15),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 
17)), "0+6", 13),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 
17)), "0+6+5", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(7, 
17)), "0+6+5+9", 15),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 
21)), "0+6", 13),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(11, 
21)), "0+6+9", 15),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(14, 
24)), "0+9", 15)),
+            actual
+        );
+    }
+
+    @Test
+    public void testNoGracePeriodLargeInput() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier =
+                inOrderIterator
+                        ? new InOrderMemoryWindowStoreSupplier("InOrder", 
500L, 10L, false)
+                        : Stores.inMemoryWindowStore("Reverse", 
Duration.ofMillis(500), Duration.ofMillis(10), false);
+
+        final KTable<Windowed<String>, String> table2 = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(10)))
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.as(storeSupplier)
+                );
+
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        table2.toStream().process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic1 =
+                    driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+
+            inputTopic1.pipeInput("E", "1", 100L);
+            inputTopic1.pipeInput("E", "3", 105L);
+            inputTopic1.pipeInput("E", "4", 106L);
+            inputTopic1.pipeInput("E", "2", 103L);
+            inputTopic1.pipeInput("E", "6", 113L);
+            inputTopic1.pipeInput("E", "5", 110L);
+            inputTopic1.pipeInput("E", "7", 104L);
+            inputTopic1.pipeInput("E", "8", 102L);
+            inputTopic1.pipeInput("E", "9", 115L);
+        }
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
+                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().key())
+                        .thenComparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().window().start());
+
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> actual = 
supplier.theCapturedProcessor().processed();
+        actual.sort(comparator);
+        assertEquals(
+            asList(
+                // E@0
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(90, 
100)), "0+1", 100),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(95, 
105)), "0+1+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 
106)), "0+1+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(96, 
106)), "0+1+3+4+2", 106),
+                // E@5
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(101, 111)), "0+3", 105),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(101, 111)), "0+3+4", 106),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(101, 111)), "0+3+4+2", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(103, 113)), "0+3+4+2+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(103, 113)), "0+3+4+2+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(103, 113)), "0+3+4+2+6+5+7", 113),
+                // E@3
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4+6+5", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(104, 114)), "0+3+4+6+5+7", 113),
+                //E@4
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(105, 115)), "0+3+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(105, 115)), "0+3+4+6+5+9", 115),
+                // E@6
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4", 106),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(106, 116)), "0+4+6+5+9", 115),
+                //E@13
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(107, 117)), "0+6", 113),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(107, 117)), "0+6+5", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(107, 117)), "0+6+5+9", 115),
+                //E@10
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(111, 121)), "0+6", 113),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(111, 121)), "0+6+9", 115),
+                //E@15
+                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(114, 124)), "0+9", 115)),
+            actual
+        );
+    }
+
     @Test
     public void shouldLogAndMeterWhenSkippingNullKey() {
         final String builtInMetricsVersion = StreamsConfig.METRICS_LATEST;

Reply via email to