ableegoldman commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r468218540



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class SlidingWindowedCogroupedKStreamImpl<K, V> extends 
AbstractStream<K, V> implements TimeWindowedCogroupedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+    private final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super 
Object, V>> groupPatterns;
+
+    SlidingWindowedCogroupedKStreamImpl(final SlidingWindows windows,
+                                        final InternalStreamsBuilder builder,
+                                        final Set<String> 
subTopologySourceNodes,
+                                        final String name,
+                                        final 
CogroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                                        final StreamsGraphNode 
streamsGraphNode,
+                                        final Map<KGroupedStreamImpl<K, ?>, 
Aggregator<? super K, ? super Object, V>> groupPatterns) {
+        super(name, null, null, subTopologySourceNodes, streamsGraphNode, 
builder);
+        //keySerde and valueSerde are null because there are many different 
groupStreams that they could be from
+        this.windows = windows;
+        this.aggregateBuilder = aggregateBuilder;
+        this.groupPatterns = groupPatterns;
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer) {
+        return aggregate(initializer, Materialized.with(null, null));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Materialized<K, V, 
WindowStore<Bytes, byte[]>> materialized) {
+        return aggregate(initializer, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Named named) {
+        return aggregate(initializer, named, Materialized.with(null, null));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Named named,
+                                            final Materialized<K, V, 
WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(
+                materialized,
+                builder,
+                CogroupedKStreamImpl.AGGREGATE_NAME);
+        return aggregateBuilder.build(
+                groupPatterns,
+                initializer,
+                new NamedInternal(named),
+                materialize(materializedInternal),
+                materializedInternal.keySerde() != null ?
+                        new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs())
+                        : null,
+                materializedInternal.valueSerde(),
+                materializedInternal.queryableStoreName(),
+                null,
+                windows,
+                null,
+                null);
+    }
+
+    private StoreBuilder<TimestampedWindowStore<K, V>> materialize(
+            final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materialized) {

Review comment:
       I think we usually leave the arguments on the same line as the method 
declaration (even if that line ends up way too long)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -77,82 +80,212 @@ public void before() {
         groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()));
     }
 
-    @Test(expected = NullPointerException.class)

Review comment:
       Awesome, thanks for cleaning up some of these older tests 😄 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error

Review comment:
       Comment on the reverse case left behind

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) 
context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (key == null || value == null) {
+                log.warn(
+                        "Skipping record due to null key or value. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            //don't process records that don't fall within a full sliding 
window
+            if (timestamp < windows.timeDifferenceMs()) {
+                log.warn(
+                        "Skipping record due to early arrival. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+            processInOrder(key, value, timestamp);
+        }
+
+

Review comment:
       nit: extra line breaks

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class SlidingWindowedCogroupedKStreamImpl<K, V> extends 
AbstractStream<K, V> implements TimeWindowedCogroupedKStream<K, V> {
+    private final SlidingWindows windows;
+    private final CogroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+    private final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super 
Object, V>> groupPatterns;
+
+    SlidingWindowedCogroupedKStreamImpl(final SlidingWindows windows,
+                                        final InternalStreamsBuilder builder,
+                                        final Set<String> 
subTopologySourceNodes,
+                                        final String name,
+                                        final 
CogroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                                        final StreamsGraphNode 
streamsGraphNode,
+                                        final Map<KGroupedStreamImpl<K, ?>, 
Aggregator<? super K, ? super Object, V>> groupPatterns) {
+        super(name, null, null, subTopologySourceNodes, streamsGraphNode, 
builder);
+        //keySerde and valueSerde are null because there are many different 
groupStreams that they could be from
+        this.windows = windows;
+        this.aggregateBuilder = aggregateBuilder;
+        this.groupPatterns = groupPatterns;
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer) {
+        return aggregate(initializer, Materialized.with(null, null));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Materialized<K, V, 
WindowStore<Bytes, byte[]>> materialized) {
+        return aggregate(initializer, NamedInternal.empty(), materialized);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Named named) {
+        return aggregate(initializer, named, Materialized.with(null, null));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
+                                            final Named named,
+                                            final Materialized<K, V, 
WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(named, "named can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(
+                materialized,
+                builder,
+                CogroupedKStreamImpl.AGGREGATE_NAME);
+        return aggregateBuilder.build(
+                groupPatterns,
+                initializer,
+                new NamedInternal(named),
+                materialize(materializedInternal),
+                materializedInternal.keySerde() != null ?
+                        new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs())
+                        : null,
+                materializedInternal.valueSerde(),
+                materializedInternal.queryableStoreName(),
+                null,
+                windows,
+                null,
+                null);
+    }
+
+    private StoreBuilder<TimestampedWindowStore<K, V>> materialize(
+            final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materialized) {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
+        if (supplier == null) {

Review comment:
       Kind of hard to tell, but is the alignment in this method a bit off?  
Might be good to just highlight and auto-indent everything, intellij will take 
care of any issues if it's configured properly

##########
File path: checkstyle/suppressions.xml
##########
@@ -163,6 +163,12 @@
     <suppress 
checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
               files="Murmur3.java"/>
 
+    <suppress checks="(NPathComplexity|MethodLength|CyclomaticComplexity)"
+              files="KStreamSlidingWindowAggregate.java"/>
+
+    <suppress checks="(CyclomaticComplexity)"
+              files="CogroupedStreamAggregateBuilder.java"/>
+

Review comment:
       You should check to make sure all of these are still needed. In 
particular I bet we can get rid of the CogroupedStreamAggregateBuilder 
suppression once your cleanup PR is merged and this one is rebased

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+    private static final String TOPIC = "input";
+    private final StreamsBuilder builder = new StreamsBuilder();
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private TimeWindowedKStream<String, String> windowedStream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
+        windowedStream = stream.
+                groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(1000L)));
+    }
+
+    @Test
+    public void shouldCountSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, Long> supplier = new 
MockProcessorSupplier<>();
+        windowedStream
+                .count()
+                .toStream()
+                .process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+                equalTo(ValueAndTimestamp.make(1L, 100L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+                equalTo(ValueAndTimestamp.make(1L, 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make(2L, 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+                equalTo(ValueAndTimestamp.make(1L, 500L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+                equalTo(ValueAndTimestamp.make(2L, 200L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make(1L, 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+                equalTo(ValueAndTimestamp.make(1L, 200L)));
+    }
+
+    @Test
+    public void shouldReduceSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        windowedStream
+                .reduce(MockReducer.STRING_ADDER)
+                .toStream()
+                .process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+                equalTo(ValueAndTimestamp.make("1", 100L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+                equalTo(ValueAndTimestamp.make("2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("1+2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+                equalTo(ValueAndTimestamp.make("3", 500L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+                equalTo(ValueAndTimestamp.make("10+20", 200L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("20", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+                equalTo(ValueAndTimestamp.make("10", 200L)));
+    }
+
+    @Test
+    public void shouldAggregateSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        windowedStream
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.with(Serdes.String(), Serdes.String()))
+                .toStream()
+                .process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+                equalTo(ValueAndTimestamp.make("0+1", 100L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+                equalTo(ValueAndTimestamp.make("0+2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("0+1+2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+                equalTo(ValueAndTimestamp.make("0+3", 500L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+                equalTo(ValueAndTimestamp.make("0+10+20", 200L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("0+20", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+                equalTo(ValueAndTimestamp.make("0+10", 200L)));
+    }
+
+    @Test
+    public void shouldMaterializeCount() {
+        windowedStream.count(
+                Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("count-store")
+                        .withKeySerde(Serdes.String())
+                        .withValueSerde(Serdes.Long()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, Long> windowStore = 
driver.getWindowStore("count-store");
+                final List<KeyValue<Windowed<String>, Long>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), 1L),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), 2L),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), 1L),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), 1L),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), 1L),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), 2L),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), 1L))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore 
=
+                        driver.getTimestampedWindowStore("count-store");
+                final List<KeyValue<Windowed<String>, 
ValueAndTimestamp<Long>>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), ValueAndTimestamp.make(1L, 100L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), ValueAndTimestamp.make(2L, 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), ValueAndTimestamp.make(1L, 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), ValueAndTimestamp.make(1L, 500L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), ValueAndTimestamp.make(1L, 150L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), ValueAndTimestamp.make(2L, 200L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), ValueAndTimestamp.make(1L, 200L)))));
+            }
+        }
+    }
+
+    @Test
+    public void shouldMaterializeReduced() {
+        windowedStream.reduce(
+                MockReducer.STRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("reduced")
+                        .withKeySerde(Serdes.String())
+                        .withValueSerde(Serdes.String()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, String> windowStore = 
driver.getWindowStore("reduced");
+                final List<KeyValue<Windowed<String>, String>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), "1"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), "1+2"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), "2"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), "3"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), "20"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), "10+20"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), "10"))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore 
=
+                        driver.getTimestampedWindowStore("reduced");
+                final List<KeyValue<Windowed<String>, 
ValueAndTimestamp<Long>>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), ValueAndTimestamp.make("1", 100L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), ValueAndTimestamp.make("1+2", 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), ValueAndTimestamp.make("2", 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), ValueAndTimestamp.make("3", 500L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), ValueAndTimestamp.make("20", 150L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), ValueAndTimestamp.make("10+20", 200L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), ValueAndTimestamp.make("10", 200L)))));
+            }
+        }
+    }
+
+    @Test
+    public void shouldMaterializeAggregated() {
+        windowedStream.aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("aggregated")
+                        .withKeySerde(Serdes.String())
+                        .withValueSerde(Serdes.String()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, String> windowStore = 
driver.getWindowStore("aggregated");
+                final List<KeyValue<Windowed<String>, String>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), "0+1"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), "0+1+2"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), "0+2"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), "0+3"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), "0+20"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), "0+10+20"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), "0+10"))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore 
=
+                        driver.getTimestampedWindowStore("aggregated");
+                final List<KeyValue<Windowed<String>, 
ValueAndTimestamp<Long>>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), ValueAndTimestamp.make("0+1", 100L)),

Review comment:
       The input is the same for each test so the output is too, right? Maybe 
we can we pull all the output verification into a single method

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+    private static final String TOPIC = "input";
+    private final StreamsBuilder builder = new StreamsBuilder();
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private TimeWindowedKStream<String, String> windowedStream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
+        windowedStream = stream.
+                groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(1000L)));
+    }
+
+    @Test
+    public void shouldCountSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, Long> supplier = new 
MockProcessorSupplier<>();
+        windowedStream
+                .count()
+                .toStream()
+                .process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+                equalTo(ValueAndTimestamp.make(1L, 100L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+                equalTo(ValueAndTimestamp.make(1L, 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make(2L, 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+                equalTo(ValueAndTimestamp.make(1L, 500L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+                equalTo(ValueAndTimestamp.make(2L, 200L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make(1L, 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+                equalTo(ValueAndTimestamp.make(1L, 200L)));
+    }
+
+    @Test
+    public void shouldReduceSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        windowedStream
+                .reduce(MockReducer.STRING_ADDER)
+                .toStream()
+                .process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+                equalTo(ValueAndTimestamp.make("1", 100L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+                equalTo(ValueAndTimestamp.make("2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("1+2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+                equalTo(ValueAndTimestamp.make("3", 500L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+                equalTo(ValueAndTimestamp.make("10+20", 200L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("20", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+                equalTo(ValueAndTimestamp.make("10", 200L)));
+    }
+
+    @Test
+    public void shouldAggregateSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        windowedStream
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.with(Serdes.String(), Serdes.String()))
+                .toStream()
+                .process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+                equalTo(ValueAndTimestamp.make("0+1", 100L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+                equalTo(ValueAndTimestamp.make("0+2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("0+1+2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+                equalTo(ValueAndTimestamp.make("0+3", 500L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+                equalTo(ValueAndTimestamp.make("0+10+20", 200L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("0+20", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+                equalTo(ValueAndTimestamp.make("0+10", 200L)));
+    }
+
+    @Test
+    public void shouldMaterializeCount() {
+        windowedStream.count(
+                Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("count-store")
+                        .withKeySerde(Serdes.String())
+                        .withValueSerde(Serdes.Long()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, Long> windowStore = 
driver.getWindowStore("count-store");
+                final List<KeyValue<Windowed<String>, Long>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), 1L),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), 2L),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), 1L),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), 1L),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), 1L),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), 2L),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), 1L))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore 
=
+                        driver.getTimestampedWindowStore("count-store");
+                final List<KeyValue<Windowed<String>, 
ValueAndTimestamp<Long>>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), ValueAndTimestamp.make(1L, 100L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), ValueAndTimestamp.make(2L, 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), ValueAndTimestamp.make(1L, 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), ValueAndTimestamp.make(1L, 500L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), ValueAndTimestamp.make(1L, 150L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), ValueAndTimestamp.make(2L, 200L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), ValueAndTimestamp.make(1L, 200L)))));
+            }
+        }
+    }
+
+    @Test
+    public void shouldMaterializeReduced() {
+        windowedStream.reduce(
+                MockReducer.STRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("reduced")
+                        .withKeySerde(Serdes.String())
+                        .withValueSerde(Serdes.String()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, String> windowStore = 
driver.getWindowStore("reduced");
+                final List<KeyValue<Windowed<String>, String>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), "1"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), "1+2"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), "2"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), "3"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), "20"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), "10+20"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), "10"))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore 
=
+                        driver.getTimestampedWindowStore("reduced");
+                final List<KeyValue<Windowed<String>, 
ValueAndTimestamp<Long>>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), ValueAndTimestamp.make("1", 100L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), ValueAndTimestamp.make("1+2", 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), ValueAndTimestamp.make("2", 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), ValueAndTimestamp.make("3", 500L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), ValueAndTimestamp.make("20", 150L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), ValueAndTimestamp.make("10+20", 200L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), ValueAndTimestamp.make("10", 200L)))));
+            }
+        }
+    }
+
+    @Test
+    public void shouldMaterializeAggregated() {
+        windowedStream.aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("aggregated")
+                        .withKeySerde(Serdes.String())
+                        .withValueSerde(Serdes.String()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, String> windowStore = 
driver.getWindowStore("aggregated");
+                final List<KeyValue<Windowed<String>, String>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), "0+1"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), "0+1+2"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), "0+2"),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), "0+3"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), "0+20"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), "0+10+20"),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), "0+10"))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore 
=
+                        driver.getTimestampedWindowStore("aggregated");
+                final List<KeyValue<Windowed<String>, 
ValueAndTimestamp<Long>>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), ValueAndTimestamp.make("0+1", 100L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), ValueAndTimestamp.make("0+1+2", 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), ValueAndTimestamp.make("0+2", 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), ValueAndTimestamp.make("0+3", 500L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), ValueAndTimestamp.make("0+20", 150L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), ValueAndTimestamp.make("0+10+20", 200L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), ValueAndTimestamp.make("0+10", 200L)))));
+            }
+        }
+    }
+
+    @Test
+    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+        assertThrows(NullPointerException.class, () ->    
windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER));

Review comment:
       nit: extra spaces after the `->`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.time.Duration;
+import java.util.Objects;
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ * A sliding window used for aggregating events.
+ * <p>
+ * Sliding windows are defined by a record's timestamp, with window size based 
on the given maximum time difference (inclusive) between
+ * records in the same window and a given window grace period. While the 
window is sliding over the input data stream, a new window is
+ * created each time a record enters the sliding window or a record drops out 
of the sliding window.
+ * <p>
+ * Records that come after set grace period will be ignored, i.e., a window is 
closed when
+ * {@code stream-time > window-end + grace-period}.
+ * <p>
+ * For example, if we have a time difference of 5000ms and the following data 
arrives:
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     1       |    8000    |
+ * +-----------+-------------+------------+
+ * |    A      |     2       |    9200    |
+ * +-----------+-------------+------------+
+ * |    A      |     3       |    12400   |
+ * +-----------+-------------+------------+
+ * </pre>
+ * We'd have the following 5 windows:
+ * <ul>
+ *     <li>window {@code [3000;8000]} contains [1] (created when first record 
enters the window)</li>
+ *     <li>window {@code [4200;9200]} contains [1,2] (created when second 
record enters the window)</li>
+ *     <li>window {@code [7400;124000]} contains [1,2,3] (created when third 
record enters the window)</li>
+ *     <li>window {@code [8001;130001]} contains [2,3] (created when the first 
record drops out of the window)</li>
+ *     <li>window {@code [9201;142001]} contains [3] (created when the second 
record drops out of the window)</li>
+ * </ul>
+ *<p>
+ * Note that while SlidingWindows are of a fixed size, as are {@link 
TimeWindows}, the start and end points of the window
+ * depend on when events occur in the stream (i.e., event timestamps), similar 
to {@link SessionWindows}.
+ * <p>
+ * For time semantics, see {@link TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#windowedBy(SlidingWindows)
+ * @see CogroupedKStream#windowedBy(SlidingWindows)
+ * @see TimestampExtractor
+ */
+
+public final class SlidingWindows {
+
+    /** The size of the windows in milliseconds, defined by the max time 
difference between records. */
+    private final long timeDifferenceMs;
+
+    /** The grace period in milliseconds. */
+    private final long graceMs;
+
+    private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
+        this.timeDifferenceMs = timeDifferenceMs;
+        this.graceMs = graceMs;
+    }
+
+    /**
+     * Return a window definition with the window size based on the given 
maximum time difference (inclusive) between
+     * records in the same window and given window grace period. Reject 
out-of-order events that arrive after {@code grace}.
+     * A window is closed when {@code stream-time > window-end + grace-period}.
+     *
+     * @param timeDifference the max time difference (inclusive) between two 
records in a window
+     * @param grace the grace period to admit out-of-order events to a window
+     * @return a new window definition
+     * @throws IllegalArgumentException if the specified window size is < 0 or 
grace <= 0, or either can't be represented as {@code long milliseconds}

Review comment:
       This comment needs to be updated, looks like we do allow a grace period 
of zero in the code/tests

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java
##########
@@ -145,6 +146,11 @@ public void shouldNotHaveNullWindowOnWindowedBySession() {
         cogroupedStream.windowedBy((SessionWindows) null);
     }
 
+    @Test(expected = NullPointerException.class)

Review comment:
        `assertThrows` 🙂 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                          final String storeName,
+                                          final Initializer<Agg> initializer,
+                                          final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private boolean reverseIteratorImplemented = false;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                    threadId,
+                    context.taskId().toString(),
+                    internalProcessorContext.currentNode().name(),
+                    metrics
+            );
+            //catch unsupported operation error
+            droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) 
context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (key == null || value == null) {
+                log.warn(
+                        "Skipping record due to null key or value. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                        value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+            if (reverseIteratorImplemented) {
+                processReverse(key, value);
+            } else {
+                processInOrder(key, value);
+            }
+        }
+
+        public void processReverse(final K key, final V value) {
+
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+            //store start times of windows we find
+            final HashSet<Long> windowStartTimes = new HashSet<Long>();
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            try (
+                    final KeyValueIterator<Windowed<K>, 
ValueAndTimestamp<Agg>> iterator = windowStore.fetch(
+                            key,
+                            key,
+                            timestamp - 2 * windows.timeDifferenceMs(),
+                            timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                //if we've already seen the window with the closest start time 
to the record
+                boolean foundRightWinAgg = false;
+                //if we've already seen the window with the closest end time 
to the record
+                boolean foundLeftWinAgg = false;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+
+                    //determine if current record's right window exists, will 
only be true at most once, on the first pass
+                    if (next.key.window().start() == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                        continue;
+                    } else if (next.key.window().end() > timestamp) {
+                        if (!foundRightWinAgg) {
+                            foundRightWinAgg = true;
+                            rightWinAgg = next.value;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        continue;
+                    } else if (next.key.window().end() == timestamp) {
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                        leftWinAlreadyCreated = true;
+                        continue;
+                    } else {
+                        if (!foundLeftWinAgg) {
+                            leftWinAgg = next.value;
+                            foundLeftWinAgg = true;
+                        }
+                        //If it's a left window, there is a record at this 
window's end time who may need a corresponding right window
+                        if (isLeftWindow(next)) {
+                            final long rightWinStart = next.key.window().end() 
+ 1;
+                            if (!windowStartTimes.contains(rightWinStart)) {
+                                final TimeWindow window = new 
TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs());
+                                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                                putAndForward(window, valueAndTime, key, 
value, closeTime, timestamp);
+                            }
+                            break;
+                        }
+                    }
+                }
+            }
+            //create the left window of the current record if it's not created
+            if (!leftWinAlreadyCreated) {
+                final ValueAndTimestamp<Agg> valueAndTime;
+                //confirms that the left window contains more than the current 
record
+                if (leftWinAgg.timestamp() < timestamp && 
leftWinAgg.timestamp() > timestamp - windows.timeDifferenceMs()) {
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
+                } else {
+                    //left window just contains the current record
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                }
+                final TimeWindow window = new TimeWindow(Math.max(0, timestamp 
- windows.timeDifferenceMs()), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+            //create the right window for the current record, if need be
+            if (!rightWinAlreadyCreated && rightWinAgg != null && 
rightWinAgg.timestamp() > timestamp) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+        }
+
+        public void processInOrder(final K key, final V value) {
+
+            final long timestamp = context().timestamp();
+            //don't process records that don't fall within a full sliding 
window
+            if (timestamp < windows.timeDifferenceMs()) {

Review comment:
       https://github.com/apache/kafka/pull/9157

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+    private static final String TOPIC = "input";
+    private final StreamsBuilder builder = new StreamsBuilder();
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+    private TimeWindowedKStream<String, String> windowedStream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
+        windowedStream = stream.
+                groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(1000L)));
+    }
+
+    @Test
+    public void shouldCountSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, Long> supplier = new 
MockProcessorSupplier<>();
+        windowedStream
+                .count()
+                .toStream()
+                .process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+                equalTo(ValueAndTimestamp.make(1L, 100L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+                equalTo(ValueAndTimestamp.make(1L, 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make(2L, 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+                equalTo(ValueAndTimestamp.make(1L, 500L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+                equalTo(ValueAndTimestamp.make(2L, 200L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make(1L, 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+                equalTo(ValueAndTimestamp.make(1L, 200L)));
+    }
+
+    @Test
+    public void shouldReduceSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        windowedStream
+                .reduce(MockReducer.STRING_ADDER)
+                .toStream()
+                .process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+                equalTo(ValueAndTimestamp.make("1", 100L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+                equalTo(ValueAndTimestamp.make("2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("1+2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+                equalTo(ValueAndTimestamp.make("3", 500L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+                equalTo(ValueAndTimestamp.make("10+20", 200L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("20", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+                equalTo(ValueAndTimestamp.make("10", 200L)));
+    }
+
+    @Test
+    public void shouldAggregateSlidingWindows() {
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new 
MockProcessorSupplier<>();
+        windowedStream
+                .aggregate(
+                        MockInitializer.STRING_INIT,
+                        MockAggregator.TOSTRING_ADDER,
+                        Materialized.with(Serdes.String(), Serdes.String()))
+                .toStream()
+                .process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+        }
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+                equalTo(ValueAndTimestamp.make("0+1", 100L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+                equalTo(ValueAndTimestamp.make("0+2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("0+1+2", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+                equalTo(ValueAndTimestamp.make("0+3", 500L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+                equalTo(ValueAndTimestamp.make("0+10+20", 200L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(50L, 150L))),
+                equalTo(ValueAndTimestamp.make("0+20", 150L)));
+        assertThat(
+                supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+                        .get(new Windowed<>("2", new TimeWindow(151L, 251L))),
+                equalTo(ValueAndTimestamp.make("0+10", 200L)));
+    }
+
+    @Test
+    public void shouldMaterializeCount() {
+        windowedStream.count(
+                Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("count-store")
+                        .withKeySerde(Serdes.String())
+                        .withValueSerde(Serdes.Long()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            processData(driver);
+            {
+                final WindowStore<String, Long> windowStore = 
driver.getWindowStore("count-store");
+                final List<KeyValue<Windowed<String>, Long>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), 1L),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), 2L),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), 1L),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), 1L),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), 1L),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), 2L),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), 1L))));
+            }
+            {
+                final WindowStore<String, ValueAndTimestamp<Long>> windowStore 
=
+                        driver.getTimestampedWindowStore("count-store");
+                final List<KeyValue<Windowed<String>, 
ValueAndTimestamp<Long>>> data =
+                        StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
+
+                assertThat(data, equalTo(Arrays.asList(
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 
100)), ValueAndTimestamp.make(1L, 100L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(50, 
150)), ValueAndTimestamp.make(2L, 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(101, 
201)), ValueAndTimestamp.make(1L, 150L)),
+                        KeyValue.pair(new Windowed<>("1", new TimeWindow(400, 
500)), ValueAndTimestamp.make(1L, 500L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(50, 
150)), ValueAndTimestamp.make(1L, 150L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(100, 
200)), ValueAndTimestamp.make(2L, 200L)),
+                        KeyValue.pair(new Windowed<>("2", new TimeWindow(151, 
251)), ValueAndTimestamp.make(1L, 200L)))));
+            }
+        }
+    }
+
+    @Test
+    public void shouldMaterializeReduced() {
+        windowedStream.reduce(
+                MockReducer.STRING_ADDER,
+                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("reduced")

Review comment:
       Can we add some tests to verify the other Materialized properties, 
specifically the retention? You can just pick a single operator (eg `reduce`) 
and write a test to make sure data is available (only) within the retention 
period.
    Also, do you think we can write a test to verify that the default retention 
is as expected when we don't specify it? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to