[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r479362665 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new TimestampedTupleFo
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r478753568 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new TimestampedTupleFo
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r478730796 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new TimestampedTupleFo
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r478728829 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new TimestampedTupleFo
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r478726427 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new TimestampedTupleFo
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r472440045 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java ## @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class SlidingWindowedKStreamImplTest { + +private static final String TOPIC = "input"; +private final StreamsBuilder builder = new StreamsBuilder(); +private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private TimeWindowedKStream windowedStream; + +@Before +public void before() { +final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); +windowedStream = stream. +groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L))); +} + +@Test +public void shouldCountSlidingWindows() { +final MockProcessorSupplier, Long> supplier = new MockProcessorSupplier<>(); +windowedStream +.count() +.toStream() +.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +processData(driver); +} +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(0L, 100L))), +equalTo(ValueAndTimestamp.make(1L, 100L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(101L, 201L))), +equalTo(ValueAndTimestamp.make(1L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(50L, 150L))), +equalTo(ValueAndTimestamp.make(2L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(400L, 500L))), +equalTo(ValueAndTimestamp.make(1L, 500L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("2", new TimeWindow(100L, 200L))), +equalTo(ValueAndTimestamp.make(2L, 200L))); +assertThat( +su
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r472436400 ## File path: checkstyle/suppressions.xml ## @@ -167,6 +167,9 @@ + Review comment: We don't! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r468796354 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java ## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class SlidingWindowedKStreamImplTest { + +private static final String TOPIC = "input"; +private final StreamsBuilder builder = new StreamsBuilder(); +private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private TimeWindowedKStream windowedStream; + +@Before +public void before() { +final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); +windowedStream = stream. +groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L))); +} + +@Test +public void shouldCountSlidingWindows() { +final MockProcessorSupplier, Long> supplier = new MockProcessorSupplier<>(); +windowedStream +.count() +.toStream() +.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +processData(driver); +} +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(0L, 100L))), +equalTo(ValueAndTimestamp.make(1L, 100L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(101L, 201L))), +equalTo(ValueAndTimestamp.make(1L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(50L, 150L))), +equalTo(ValueAndTimestamp.make(2L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(400L, 500L))), +equalTo(ValueAndTimestamp.make(1L, 500L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("2", new TimeWindow(100L, 200L))), +equalTo(ValueAndTimestamp.make(2L, 200L))); +assertThat( +supplier.t
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r468690429 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java ## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class SlidingWindowedKStreamImplTest { + +private static final String TOPIC = "input"; +private final StreamsBuilder builder = new StreamsBuilder(); +private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private TimeWindowedKStream windowedStream; + +@Before +public void before() { +final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); +windowedStream = stream. +groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L))); +} + +@Test +public void shouldCountSlidingWindows() { +final MockProcessorSupplier, Long> supplier = new MockProcessorSupplier<>(); +windowedStream +.count() +.toStream() +.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +processData(driver); +} +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(0L, 100L))), +equalTo(ValueAndTimestamp.make(1L, 100L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(101L, 201L))), +equalTo(ValueAndTimestamp.make(1L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(50L, 150L))), +equalTo(ValueAndTimestamp.make(2L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(400L, 500L))), +equalTo(ValueAndTimestamp.make(1L, 500L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("2", new TimeWindow(100L, 200L))), +equalTo(ValueAndTimestamp.make(2L, 200L))); +assertThat( +supplier.t
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r468653951 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java ## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class SlidingWindowedKStreamImplTest { + +private static final String TOPIC = "input"; +private final StreamsBuilder builder = new StreamsBuilder(); +private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private TimeWindowedKStream windowedStream; + +@Before +public void before() { +final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); +windowedStream = stream. +groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L))); +} + +@Test +public void shouldCountSlidingWindows() { +final MockProcessorSupplier, Long> supplier = new MockProcessorSupplier<>(); +windowedStream +.count() +.toStream() +.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +processData(driver); +} +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(0L, 100L))), +equalTo(ValueAndTimestamp.make(1L, 100L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(101L, 201L))), +equalTo(ValueAndTimestamp.make(1L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(50L, 150L))), +equalTo(ValueAndTimestamp.make(2L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(400L, 500L))), +equalTo(ValueAndTimestamp.make(1L, 500L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("2", new TimeWindow(100L, 200L))), +equalTo(ValueAndTimestamp.make(2L, 200L))); +assertThat( +supplier.t
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r467260318 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Instant; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r466425932 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -132,16 +135,19 @@ final boolean stateCreated, final StoreBuilder storeBuilder, final Windows windows, + final SlidingWindows slidingWindows, final SessionWindows sessionWindows, final Merger sessionMerger) { final ProcessorSupplier kStreamAggregate; -if (windows == null && sessionWindows == null) { +if (windows == null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator); -} else if (windows != null && sessionWindows == null) { +} else if (windows != null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator); -} else if (windows == null && sessionMerger != null) { +} else if (windows == null && slidingWindows != null && sessionWindows == null) { +kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator); +} else if (windows == null && slidingWindows == null && sessionMerger != null) { Review comment: I'm happy to do a PR! Looking into it now though, `getStatefulProcessorNode` is called by `build`, so I think to really separate it by type we'd need a different `build` _and_ `StatefulProcessorNode`, otherwise we'd be moving the null checks into `build` and then calling the correct `getStatefulProcessorNode`, which does't seem to really fix anything. Thoughts? It's easy to create new `build` functions but I figured this might fall under not avoiding code duplication :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r466070317 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception { ))); } + + +@Test +public void shouldReduceSlidingWindows() throws Exception { +streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0); +final long firstBatchTimestamp = 2000L; +final long timeDifference = 1000L; +produceMessages(firstBatchTimestamp); +final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2; +produceMessages(secondBatchTimestamp); +final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference - 100L; +produceMessages(thirdBatchTimestamp); + +final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); +groupedStream + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L))) +.reduce(reducer) +.toStream() +.to(outputTopic, Produced.with(windowedSerde, Serdes.String())); + +startStreams(); + +final List, String>> windowedOutput = receiveMessages( +new TimeWindowedDeserializer<>(), +new StringDeserializer(), +String.class, +25); + +// read from ConsoleConsumer +final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer( +new TimeWindowedDeserializer(), +new StringDeserializer(), +String.class, +25, +true); + +final Comparator, String>> comparator = +Comparator.comparing((KeyValueTimestamp, String> o) -> o.key().key()) +.thenComparing(KeyValueTimestamp::value); + +windowedOutput.sort(comparator); +final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference; +final long firstBatchRightWindow = firstBatchTimestamp + 1; +final long secondBatchLeftWindow = secondBatchTimestamp - timeDifference; +final long secondBatchRightWindow = secondBatchTimestamp + 1; +final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference; + +final List, String>> expectResult = Arrays.asList( +new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp), Review comment: Created a ticket for this here: https://issues.apache.org/jira/browse/KAFKA-10366 let me know if the description isn't clear This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r466042505 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception { ))); } + + +@Test +public void shouldReduceSlidingWindows() throws Exception { +streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0); +final long firstBatchTimestamp = 2000L; +final long timeDifference = 1000L; +produceMessages(firstBatchTimestamp); +final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2; +produceMessages(secondBatchTimestamp); +final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference - 100L; +produceMessages(thirdBatchTimestamp); Review comment: That should be covered in `KStreamSlidingWindowAggregateTest`, which goes through more of the edge cases using the `TopologyTestDriver` which is a little easier to manipulate than this set up This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465751597 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (Timesta
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465756356 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (Timesta
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465348679 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.junit.Test; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.streams.EqualityCheck.verifyEquality; +import static org.apache.kafka.streams.EqualityCheck.verifyInEquality; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +@SuppressWarnings("deprecation") +public class SlidingWindowsTest { + +private static final long ANY_SIZE = 123L; + +@Test +public void shouldSetWindowSize() { +assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), ofMillis(3)).timeDifferenceMs()); +} + +@Test(expected = IllegalArgumentException.class) +public void windowSizeMustNotBeZero() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(0), ofMillis(5)); +} + +@Test(expected = IllegalArgumentException.class) +public void windowSizeMustNotBeNegative() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(-1), ofMillis(5)); +} + +@Test +public void shouldSetGracePeriod() { +assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(ANY_SIZE)).gracePeriodMs()); +} + +@Test(expected = IllegalArgumentException.class) +public void graceMustNotBeNegative() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(-1)); +} + +@Test +public void gracePeriodShouldEnforceBoundaries() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3L), ofMillis(0L)); + +try { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3L), ofMillis(-1L)); +fail("should not accept negatives"); +} catch (final IllegalArgumentException e) { +//expected +} +} + +@Test +public void equalsAndHashcodeShouldBeValidForPositiveCases() { +verifyEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(3)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(3))); + +verifyEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(1)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(1))); + +verifyEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(4))); + +} + +@Test +public void equalsAndHashcodeShouldBeValidForNegativeCases() { + + verifyInEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(2)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(1))); + + verifyInEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(9)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(4))); + + +verifyInEquality( +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(4), ofMillis(2)), +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), ofMillis(2)) +); + +assertNotEquals( Review comment: They look to be fairly similar, and it seems like the tests use both consistently. `verifyInEquality` seems to be more thorough, and to be consistent with the above `equalsAndHashcodeShouldBeValidForPositiveCases` I think I'll use `verifyInEquality` for this test, unless someone has an objection This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465339322 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Instant; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465336976 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Instant; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465334577 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java ## @@ -275,6 +275,15 @@ */ TimeWindowedCogroupedKStream windowedBy(final Windows windows); +/** + * Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding + * windowed aggregations. + * + * @param windows the specification of the aggregation {@link SlidingWindows} + * @return an instance of {@link TimeWindowedCogroupedKStream} + */ +TimeWindowedCogroupedKStream windowedBy(final SlidingWindows windows); Review comment: We could, but it would only pull out 3ish classes and not very many lines, so I don't think it would make this PR feel much smaller This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465219522 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception { ))); } + + +@Test +public void shouldReduceSlidingWindows() throws Exception { +streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0); +final long firstBatchTimestamp = 2000L; +final long timeDifference = 1000L; +produceMessages(firstBatchTimestamp); +final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2; +produceMessages(secondBatchTimestamp); +final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference - 100L; +produceMessages(thirdBatchTimestamp); Review comment: To clarify, are you wanting to add records that would fall after the third batch _outside_ of all the existing windows, or so that it will fall into the third batch's windows but not the second batch's windows? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465205088 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ## @@ -221,7 +221,6 @@ public void shouldTransitionToRunningOnStart() throws Exception { globalStreamThread.shutdown(); } -@Test Review comment: Whoops, not on purpose. Thanks for the check This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465194845 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.junit.Test; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.streams.EqualityCheck.verifyEquality; +import static org.apache.kafka.streams.EqualityCheck.verifyInEquality; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +@SuppressWarnings("deprecation") +public class SlidingWindowsTest { + +private static final long ANY_SIZE = 123L; + +@Test +public void shouldSetWindowSize() { +assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), ofMillis(3)).timeDifferenceMs()); +} + +@Test(expected = IllegalArgumentException.class) +public void windowSizeMustNotBeZero() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(0), ofMillis(5)); +} + +@Test(expected = IllegalArgumentException.class) +public void windowSizeMustNotBeNegative() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(-1), ofMillis(5)); +} + +@Test +public void shouldSetGracePeriod() { +assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(ANY_SIZE)).gracePeriodMs()); +} + +@Test(expected = IllegalArgumentException.class) +public void graceMustNotBeNegative() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(-1)); +} + +@Test +public void gracePeriodShouldEnforceBoundaries() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3L), ofMillis(0L)); Review comment: Re-examining the test, it looks like it does the same thing as `gracePeriodMustNotBeNegative()` so I think the test can be removed entirely This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465193735 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.junit.Test; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.streams.EqualityCheck.verifyEquality; +import static org.apache.kafka.streams.EqualityCheck.verifyInEquality; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +@SuppressWarnings("deprecation") +public class SlidingWindowsTest { + +private static final long ANY_SIZE = 123L; + +@Test +public void shouldSetWindowSize() { +assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), ofMillis(3)).timeDifferenceMs()); +} + +@Test(expected = IllegalArgumentException.class) +public void windowSizeMustNotBeZero() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(0), ofMillis(5)); +} + +@Test(expected = IllegalArgumentException.class) +public void windowSizeMustNotBeNegative() { +SlidingWindows.withTimeDifferenceAndGrace(ofMillis(-1), ofMillis(5)); Review comment: I think just confirming that the correct error will be thrown when someone sets a `timeDifference` we don't want. I'll update all the `windowSize` to be `timeDifference` and I agree, no need to check that it isn't 0 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465191505 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception { ))); } + + +@Test +public void shouldReduceSlidingWindows() throws Exception { +streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0); +final long firstBatchTimestamp = 2000L; +final long timeDifference = 1000L; +produceMessages(firstBatchTimestamp); +final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2; +produceMessages(secondBatchTimestamp); +final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference - 100L; +produceMessages(thirdBatchTimestamp); + +final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); +groupedStream + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L))) +.reduce(reducer) +.toStream() +.to(outputTopic, Produced.with(windowedSerde, Serdes.String())); + +startStreams(); + +final List, String>> windowedOutput = receiveMessages( +new TimeWindowedDeserializer<>(), +new StringDeserializer(), +String.class, +25); + +// read from ConsoleConsumer +final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer( +new TimeWindowedDeserializer(), +new StringDeserializer(), +String.class, +25, +true); + +final Comparator, String>> comparator = +Comparator.comparing((KeyValueTimestamp, String> o) -> o.key().key()) +.thenComparing(KeyValueTimestamp::value); + +windowedOutput.sort(comparator); +final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference; +final long firstBatchRightWindow = firstBatchTimestamp + 1; +final long secondBatchLeftWindow = secondBatchTimestamp - timeDifference; +final long secondBatchRightWindow = secondBatchTimestamp + 1; +final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference; + +final List, String>> expectResult = Arrays.asList( +new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp), Review comment: There seems to be a bug in `TimeWindowedDeserializer` related to [this ticket](https://issues.apache.org/jira/browse/KAFKA-4468) that ends up setting the windowSize to `Long.MAX_VALUE`. For the purposes of testing, I don't think having it as the max value is totally awful (just somewhat awful) and the window end calculations are all tested in a different set of tests done through topology driver. I'll make a ticket for this bug and try to get it fixed when I'm done with testing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465111665 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import java.time.Duration; +import java.util.Objects; +import java.util.Set; +import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME; +import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME; + +public class SlidingWindowedKStreamImpl extends AbstractStream implements TimeWindowedKStream { +private final SlidingWindows windows; +private final GroupedStreamAggregateBuilder aggregateBuilder; + +SlidingWindowedKStreamImpl(final SlidingWindows windows, + final InternalStreamsBuilder builder, + final Set subTopologySourceNodes, + final String name, + final Serde keySerde, + final Serde valueSerde, + final GroupedStreamAggregateBuilder aggregateBuilder, + final StreamsGraphNode streamsGraphNode) { +super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder); +this.windows = Objects.requireNonNull(windows, "windows can't be null"); +this.aggregateBuilder = aggregateBuilder; +} + +@Override +public KTable, Long> count() { +return count(NamedInternal.empty()); +} + +@Override +public KTable, Long> count(final Named named) { +return doCount(named, Materialized.with(keySerde, Serdes.Long())); +} + +@Override +public KTable, Long> count(final Materialized> materialized) { +return count(NamedInternal.empty(), materialized); +} + +@Override +public KTable, Long> count(final Named named, final Materialized> materialized) { +Objects.requireNonNull(materialized, "materialized can't be null"); +return doCount(named, materialized); +} + +private KTable, Long> doCount(final Named named, + final Materialized> materialized) { +final MaterializedInternal> materializedInternal = +new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + +if (materializedInternal.keySerde() == null) { +materializedInternal.withKeySerde(keySerde); +} +if (materializedInternal.valueSerde() == null) { +materializedInternal.withValueSerde(Serdes.Long()); +} + +final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + +return aggregateBuilder.build( +new NamedInternal(aggregateName), +materialize(materializedInternal), +new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465094963 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception { ))); } + + +@Test +public void shouldReduceSlidingWindows() throws Exception { +streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0); +final long firstBatchTimestamp = 2000L; +final long timeDifference = 1000L; +produceMessages(firstBatchTimestamp); +final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2; +produceMessages(secondBatchTimestamp); +final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference - 100L; +produceMessages(thirdBatchTimestamp); + +final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); +groupedStream + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L))) +.reduce(reducer) +.toStream() +.to(outputTopic, Produced.with(windowedSerde, Serdes.String())); + +startStreams(); + +final List, String>> windowedOutput = receiveMessages( +new TimeWindowedDeserializer<>(), +new StringDeserializer(), +String.class, +25); + +// read from ConsoleConsumer +final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer( +new TimeWindowedDeserializer(), +new StringDeserializer(), +String.class, +25, +true); + +final Comparator, String>> comparator = +Comparator.comparing((KeyValueTimestamp, String> o) -> o.key().key()) +.thenComparing(KeyValueTimestamp::value); + +windowedOutput.sort(comparator); +final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference; +final long firstBatchRightWindow = firstBatchTimestamp + 1; +final long secondBatchLeftWindow = secondBatchTimestamp - timeDifference; +final long secondBatchRightWindow = secondBatchTimestamp + 1; +final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference; + +final List, String>> expectResult = Arrays.asList( +new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp), Review comment: Because the windows are sorted, the windows created by each record aren't consecutive, so I added comments describing each window, but only did it for A since all the other keys are processed the exact same way. Sample comment: `// A @ secondBatchTimestamp left window created when A @ secondBatchTimestamp processed` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465079294 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import java.time.Duration; +import java.util.Objects; +import java.util.Set; +import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME; +import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME; + +public class SlidingWindowedKStreamImpl extends AbstractStream implements TimeWindowedKStream { +private final SlidingWindows windows; +private final GroupedStreamAggregateBuilder aggregateBuilder; + +SlidingWindowedKStreamImpl(final SlidingWindows windows, + final InternalStreamsBuilder builder, + final Set subTopologySourceNodes, + final String name, + final Serde keySerde, + final Serde valueSerde, + final GroupedStreamAggregateBuilder aggregateBuilder, + final StreamsGraphNode streamsGraphNode) { +super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder); +this.windows = Objects.requireNonNull(windows, "windows can't be null"); +this.aggregateBuilder = aggregateBuilder; +} + +@Override +public KTable, Long> count() { +return count(NamedInternal.empty()); +} + +@Override +public KTable, Long> count(final Named named) { +return doCount(named, Materialized.with(keySerde, Serdes.Long())); +} + +@Override +public KTable, Long> count(final Materialized> materialized) { +return count(NamedInternal.empty(), materialized); +} + +@Override +public KTable, Long> count(final Named named, final Materialized> materialized) { +Objects.requireNonNull(materialized, "materialized can't be null"); +return doCount(named, materialized); +} + +private KTable, Long> doCount(final Named named, + final Materialized> materialized) { +final MaterializedInternal> materializedInternal = +new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + +if (materializedInternal.keySerde() == null) { +materializedInternal.withKeySerde(keySerde); +} +if (materializedInternal.valueSerde() == null) { +materializedInternal.withValueSerde(Serdes.Long()); +} + +final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + +return aggregateBuilder.build( +new NamedInternal(aggregateName), +materialize(materializedInternal), +new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465073439 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -132,16 +135,19 @@ final boolean stateCreated, final StoreBuilder storeBuilder, final Windows windows, + final SlidingWindows slidingWindows, final SessionWindows sessionWindows, final Merger sessionMerger) { final ProcessorSupplier kStreamAggregate; -if (windows == null && sessionWindows == null) { +if (windows == null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator); -} else if (windows != null && sessionWindows == null) { +} else if (windows != null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator); -} else if (windows == null && sessionMerger != null) { +} else if (windows == null && slidingWindows != null && sessionWindows == null) { +kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator); +} else if (windows == null && slidingWindows == null && sessionMerger != null) { Review comment: Would checking both be redundant? It looks like the method that ultimately calls this one will check that sessionMerger is not null for session windows, so I think either both of these will be null or neither will be null This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463092579 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463091437 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463088582 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463088582 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463083821 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463083346 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463078058 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463077196 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463076565 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463073842 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463063406 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463056420 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463047237 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463046306 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463039985 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java ## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + +import java.time.Duration; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME; +import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME; + +public class SlidingWindowedKStreamImpl extends AbstractStream implements TimeWindowedKStream { +private final SlidingWindows windows; +private final GroupedStreamAggregateBuilder aggregateBuilder; + +SlidingWindowedKStreamImpl(final SlidingWindows windows, +final InternalStreamsBuilder builder, +final Set subTopologySourceNodes, +final String name, +final Serde keySerde, +final Serde valueSerde, +final GroupedStreamAggregateBuilder aggregateBuilder, +final StreamsGraphNode streamsGraphNode) { +super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder); +this.windows = Objects.requireNonNull(windows, "windows can't be null"); +this.aggregateBuilder = aggregateBuilder; +} + +@Override +public KTable, Long> count() { +return count(NamedInternal.empty()); +} + +@Override +public KTable, Long> count(final Named named) { +return doCount(named, Materialized.with(keySerde, Serdes.Long())); +} + + +@Override +public KTable, Long> count(final Materialized> materialized) { +return count(NamedInternal.empty(), materialized); +} + +@Override +public KTable, Long> count(final Named named, final Materialized> materialized) { +Objects.requireNonNull(materialized, "materialized can't be null"); + +// TODO: remove this when we do a topology-incompatible release +// we used to burn a topology name here, so we have to keep doing it for compatibility +if (new MaterializedInternal<>(materialized).storeName() == null) { +builder.newStoreName(AGGREGATE_NAME); +} + +return doCount(named, materialized); +} + +private KTable, Long> doCount(final Named named, + final Materialized> materialized) { +final MaterializedInternal> materializedInternal = +new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); + +if (materializedInternal.keySerde() == null) { +materializedInternal.withKeySerde(keySerde); +} +if (materializedInternal.valueSerde() == null) { +materializedInternal.withValueSerde(Serdes.Long()); +} + +final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + +return aggregateBu
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463036048 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.HashSet; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { Review comment: done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463026843 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -132,16 +135,19 @@ final boolean stateCreated, final StoreBuilder storeBuilder, final Windows windows, + final SlidingWindows slidingWindows, final SessionWindows sessionWindows, final Merger sessionMerger) { final ProcessorSupplier kStreamAggregate; -if (windows == null && sessionWindows == null) { +if (windows == null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator); -} else if (windows != null && sessionWindows == null) { +} else if (windows != null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator); -} else if (windows == null && sessionMerger != null) { +} else if (windows == null && slidingWindows != null && sessionWindows == null) { +kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator); +} else if (windows == null && slidingWindows == null && sessionMerger != null) { Review comment: The original just had the check for `sessionMerger != null`, are there scenarios where the sessionMerger would be null but the sessionWindows wouldn't? I did think it was kind of inconsistent to check 'sessionMerger' just that one time and check 'sessionWindows' the other times so maybe it was a mistake This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org