[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-28 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r479362665



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-27 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r478753568



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-27 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r478730796



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-27 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r478728829



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-27 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r478726427



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-18 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r472440045



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
##
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+private static final String TOPIC = "input";
+private final StreamsBuilder builder = new StreamsBuilder();
+private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private TimeWindowedKStream windowedStream;
+
+@Before
+public void before() {
+final KStream stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
+windowedStream = stream.
+groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(1000L)));
+}
+
+@Test
+public void shouldCountSlidingWindows() {
+final MockProcessorSupplier, Long> supplier = new 
MockProcessorSupplier<>();
+windowedStream
+.count()
+.toStream()
+.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+processData(driver);
+}
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+equalTo(ValueAndTimestamp.make(1L, 100L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+equalTo(ValueAndTimestamp.make(1L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+equalTo(ValueAndTimestamp.make(2L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+equalTo(ValueAndTimestamp.make(1L, 500L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+equalTo(ValueAndTimestamp.make(2L, 200L)));
+assertThat(
+

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-18 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r472436400



##
File path: checkstyle/suppressions.xml
##
@@ -167,6 +167,9 @@
 
 
+

Review comment:
   We don't!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-11 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r468796354



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
##
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+private static final String TOPIC = "input";
+private final StreamsBuilder builder = new StreamsBuilder();
+private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private TimeWindowedKStream windowedStream;
+
+@Before
+public void before() {
+final KStream stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
+windowedStream = stream.
+groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(1000L)));
+}
+
+@Test
+public void shouldCountSlidingWindows() {
+final MockProcessorSupplier, Long> supplier = new 
MockProcessorSupplier<>();
+windowedStream
+.count()
+.toStream()
+.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+processData(driver);
+}
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+equalTo(ValueAndTimestamp.make(1L, 100L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+equalTo(ValueAndTimestamp.make(1L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+equalTo(ValueAndTimestamp.make(2L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+equalTo(ValueAndTimestamp.make(1L, 500L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+equalTo(ValueAndTimestamp.make(2L, 200L)));
+assertThat(
+

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-11 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r468690429



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
##
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+private static final String TOPIC = "input";
+private final StreamsBuilder builder = new StreamsBuilder();
+private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private TimeWindowedKStream windowedStream;
+
+@Before
+public void before() {
+final KStream stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
+windowedStream = stream.
+groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(1000L)));
+}
+
+@Test
+public void shouldCountSlidingWindows() {
+final MockProcessorSupplier, Long> supplier = new 
MockProcessorSupplier<>();
+windowedStream
+.count()
+.toStream()
+.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+processData(driver);
+}
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+equalTo(ValueAndTimestamp.make(1L, 100L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+equalTo(ValueAndTimestamp.make(1L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+equalTo(ValueAndTimestamp.make(2L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+equalTo(ValueAndTimestamp.make(1L, 500L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+equalTo(ValueAndTimestamp.make(2L, 200L)));
+assertThat(
+

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-11 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r468653951



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
##
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+private static final String TOPIC = "input";
+private final StreamsBuilder builder = new StreamsBuilder();
+private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private TimeWindowedKStream windowedStream;
+
+@Before
+public void before() {
+final KStream stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
+windowedStream = stream.
+groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(1000L)));
+}
+
+@Test
+public void shouldCountSlidingWindows() {
+final MockProcessorSupplier, Long> supplier = new 
MockProcessorSupplier<>();
+windowedStream
+.count()
+.toStream()
+.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+processData(driver);
+}
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+equalTo(ValueAndTimestamp.make(1L, 100L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+equalTo(ValueAndTimestamp.make(1L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+equalTo(ValueAndTimestamp.make(2L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+equalTo(ValueAndTimestamp.make(1L, 500L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+equalTo(ValueAndTimestamp.make(2L, 200L)));
+assertThat(
+

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-07 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r467260318



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.time.Instant;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+   

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-06 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r466425932



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -132,16 +135,19 @@

 final boolean stateCreated,

 final StoreBuilder storeBuilder,

 final Windows windows,
+   
 final SlidingWindows slidingWindows,

 final SessionWindows sessionWindows,

 final Merger sessionMerger) {
 
 final ProcessorSupplier kStreamAggregate;
 
-if (windows == null && sessionWindows == null) {
+if (windows == null && slidingWindows == null && sessionWindows == 
null) {
 kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), 
initializer, aggregator);
-} else if (windows != null && sessionWindows == null) {
+} else if (windows != null && slidingWindows == null && sessionWindows 
== null) {
 kStreamAggregate = new KStreamWindowAggregate<>(windows, 
storeBuilder.name(), initializer, aggregator);
-} else if (windows == null && sessionMerger != null) {
+} else if (windows == null && slidingWindows != null && sessionWindows 
== null) {
+kStreamAggregate = new 
KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), 
initializer, aggregator);
+} else if (windows == null && slidingWindows == null && sessionMerger 
!= null) {

Review comment:
   I'm happy to do a PR! Looking into it now though, 
`getStatefulProcessorNode` is called by `build`, so I think to really separate 
it by type we'd need a different `build` _and_ `StatefulProcessorNode`, 
otherwise we'd be moving the null checks into `build` and then calling the 
correct `getStatefulProcessorNode`, which does't seem to really fix anything. 
Thoughts? It's easy to create new `build` functions but I figured this might 
fall under not avoiding code duplication :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-05 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r466070317



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception {
 )));
 }
 
+
+
+@Test
+public void shouldReduceSlidingWindows() throws Exception {
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+final long firstBatchTimestamp = 2000L;
+final long timeDifference = 1000L;
+produceMessages(firstBatchTimestamp);
+final long secondBatchTimestamp = firstBatchTimestamp + timeDifference 
/ 2;
+produceMessages(secondBatchTimestamp);
+final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference 
- 100L;
+produceMessages(thirdBatchTimestamp);
+
+final Serde> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+groupedStream
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
+.reduce(reducer)
+.toStream()
+.to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
+
+startStreams();
+
+final List, String>> windowedOutput 
= receiveMessages(
+new TimeWindowedDeserializer<>(),
+new StringDeserializer(),
+String.class,
+25);
+
+// read from ConsoleConsumer
+final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
+new TimeWindowedDeserializer(),
+new StringDeserializer(),
+String.class,
+25,
+true);
+
+final Comparator, String>> 
comparator =
+Comparator.comparing((KeyValueTimestamp, 
String> o) -> o.key().key())
+.thenComparing(KeyValueTimestamp::value);
+
+windowedOutput.sort(comparator);
+final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference;
+final long firstBatchRightWindow = firstBatchTimestamp + 1;
+final long secondBatchLeftWindow = secondBatchTimestamp - 
timeDifference;
+final long secondBatchRightWindow = secondBatchTimestamp + 1;
+final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
+
+final List, String>> expectResult = 
Arrays.asList(
+new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),

Review comment:
   Created a ticket for this here: 
https://issues.apache.org/jira/browse/KAFKA-10366 let me know if the 
description isn't clear





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-05 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r466042505



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception {
 )));
 }
 
+
+
+@Test
+public void shouldReduceSlidingWindows() throws Exception {
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+final long firstBatchTimestamp = 2000L;
+final long timeDifference = 1000L;
+produceMessages(firstBatchTimestamp);
+final long secondBatchTimestamp = firstBatchTimestamp + timeDifference 
/ 2;
+produceMessages(secondBatchTimestamp);
+final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference 
- 100L;
+produceMessages(thirdBatchTimestamp);

Review comment:
   That should be covered in `KStreamSlidingWindowAggregateTest`, which 
goes through more of the edge cases using the `TopologyTestDriver` which is a 
little easier to manipulate than this set up





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-05 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465751597



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-05 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465756356



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465348679



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
+import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("deprecation")
+public class SlidingWindowsTest {
+
+private static final long ANY_SIZE = 123L;
+
+@Test
+public void shouldSetWindowSize() {
+assertEquals(ANY_SIZE, 
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), 
ofMillis(3)).timeDifferenceMs());
+}
+
+@Test(expected = IllegalArgumentException.class)
+public void windowSizeMustNotBeZero() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(0), ofMillis(5));
+}
+
+@Test(expected = IllegalArgumentException.class)
+public void windowSizeMustNotBeNegative() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(-1), ofMillis(5));
+}
+
+@Test
+public void shouldSetGracePeriod() {
+assertEquals(ANY_SIZE, 
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(ANY_SIZE)).gracePeriodMs());
+}
+
+@Test(expected = IllegalArgumentException.class)
+public void graceMustNotBeNegative() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(-1));
+}
+
+@Test
+public void gracePeriodShouldEnforceBoundaries() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3L), ofMillis(0L));
+
+try {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3L), 
ofMillis(-1L));
+fail("should not accept negatives");
+} catch (final IllegalArgumentException e) {
+//expected
+}
+}
+
+@Test
+public void equalsAndHashcodeShouldBeValidForPositiveCases() {
+verifyEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(3)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(3)));
+
+verifyEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(1)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(1)));
+
+verifyEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(4)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(4)));
+
+}
+
+@Test
+public void equalsAndHashcodeShouldBeValidForNegativeCases() {
+
+
verifyInEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(2)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(1)));
+
+
verifyInEquality(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(9)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(4)));
+
+
+verifyInEquality(
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(4), 
ofMillis(2)),
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3), 
ofMillis(2))
+);
+
+assertNotEquals(

Review comment:
   They look to be fairly similar, and it seems like the tests use both 
consistently. `verifyInEquality` seems to be more thorough, and to be 
consistent with the above `equalsAndHashcodeShouldBeValidForPositiveCases` I 
think I'll use `verifyInEquality` for this test, unless someone has an objection





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465339322



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.time.Instant;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+   

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465336976



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.time.Instant;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+   

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465334577



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
##
@@ -275,6 +275,15 @@
  */
  TimeWindowedCogroupedKStream windowedBy(final 
Windows windows);
 
+/**
+ * Create a new {@link TimeWindowedCogroupedKStream} instance that can be 
used to perform sliding
+ * windowed aggregations.
+ *
+ * @param windows the specification of the aggregation {@link 
SlidingWindows}
+ * @return an instance of {@link TimeWindowedCogroupedKStream}
+ */
+TimeWindowedCogroupedKStream windowedBy(final SlidingWindows 
windows);

Review comment:
   We could, but it would only pull out 3ish classes and not very many 
lines, so I don't think it would make this PR feel much smaller





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465219522



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception {
 )));
 }
 
+
+
+@Test
+public void shouldReduceSlidingWindows() throws Exception {
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+final long firstBatchTimestamp = 2000L;
+final long timeDifference = 1000L;
+produceMessages(firstBatchTimestamp);
+final long secondBatchTimestamp = firstBatchTimestamp + timeDifference 
/ 2;
+produceMessages(secondBatchTimestamp);
+final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference 
- 100L;
+produceMessages(thirdBatchTimestamp);

Review comment:
   To clarify, are you wanting to add records that would fall after the 
third batch _outside_ of all the existing windows, or so that it will fall into 
the third batch's windows but not the second batch's windows?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465205088



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##
@@ -221,7 +221,6 @@ public void shouldTransitionToRunningOnStart() throws 
Exception {
 globalStreamThread.shutdown();
 }
 
-@Test

Review comment:
   Whoops, not on purpose. Thanks for the check





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465194845



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
+import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("deprecation")
+public class SlidingWindowsTest {
+
+private static final long ANY_SIZE = 123L;
+
+@Test
+public void shouldSetWindowSize() {
+assertEquals(ANY_SIZE, 
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), 
ofMillis(3)).timeDifferenceMs());
+}
+
+@Test(expected = IllegalArgumentException.class)
+public void windowSizeMustNotBeZero() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(0), ofMillis(5));
+}
+
+@Test(expected = IllegalArgumentException.class)
+public void windowSizeMustNotBeNegative() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(-1), ofMillis(5));
+}
+
+@Test
+public void shouldSetGracePeriod() {
+assertEquals(ANY_SIZE, 
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(ANY_SIZE)).gracePeriodMs());
+}
+
+@Test(expected = IllegalArgumentException.class)
+public void graceMustNotBeNegative() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(-1));
+}
+
+@Test
+public void gracePeriodShouldEnforceBoundaries() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(3L), ofMillis(0L));

Review comment:
   Re-examining the test, it looks like it does the same thing as 
`gracePeriodMustNotBeNegative()` so I think the test can be removed entirely





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465193735



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
+import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("deprecation")
+public class SlidingWindowsTest {
+
+private static final long ANY_SIZE = 123L;
+
+@Test
+public void shouldSetWindowSize() {
+assertEquals(ANY_SIZE, 
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), 
ofMillis(3)).timeDifferenceMs());
+}
+
+@Test(expected = IllegalArgumentException.class)
+public void windowSizeMustNotBeZero() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(0), ofMillis(5));
+}
+
+@Test(expected = IllegalArgumentException.class)
+public void windowSizeMustNotBeNegative() {
+SlidingWindows.withTimeDifferenceAndGrace(ofMillis(-1), ofMillis(5));

Review comment:
   I think just confirming that the correct error will be thrown when 
someone sets a `timeDifference` we don't want. I'll update all the `windowSize` 
to be `timeDifference` and I agree, no need to check that it isn't 0





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465191505



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception {
 )));
 }
 
+
+
+@Test
+public void shouldReduceSlidingWindows() throws Exception {
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+final long firstBatchTimestamp = 2000L;
+final long timeDifference = 1000L;
+produceMessages(firstBatchTimestamp);
+final long secondBatchTimestamp = firstBatchTimestamp + timeDifference 
/ 2;
+produceMessages(secondBatchTimestamp);
+final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference 
- 100L;
+produceMessages(thirdBatchTimestamp);
+
+final Serde> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+groupedStream
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
+.reduce(reducer)
+.toStream()
+.to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
+
+startStreams();
+
+final List, String>> windowedOutput 
= receiveMessages(
+new TimeWindowedDeserializer<>(),
+new StringDeserializer(),
+String.class,
+25);
+
+// read from ConsoleConsumer
+final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
+new TimeWindowedDeserializer(),
+new StringDeserializer(),
+String.class,
+25,
+true);
+
+final Comparator, String>> 
comparator =
+Comparator.comparing((KeyValueTimestamp, 
String> o) -> o.key().key())
+.thenComparing(KeyValueTimestamp::value);
+
+windowedOutput.sort(comparator);
+final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference;
+final long firstBatchRightWindow = firstBatchTimestamp + 1;
+final long secondBatchLeftWindow = secondBatchTimestamp - 
timeDifference;
+final long secondBatchRightWindow = secondBatchTimestamp + 1;
+final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
+
+final List, String>> expectResult = 
Arrays.asList(
+new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),

Review comment:
   There seems to be a bug in `TimeWindowedDeserializer` related to [this 
ticket](https://issues.apache.org/jira/browse/KAFKA-4468) that ends up setting 
the windowSize to `Long.MAX_VALUE`. For the purposes of testing, I don't think 
having it as the max value is totally awful (just somewhat awful) and the 
window end calculations are all tested in a different set of tests done through 
topology driver. I'll make a ticket for this bug and try to get it fixed when 
I'm done with testing





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465111665



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl extends AbstractStream 
implements TimeWindowedKStream {
+private final SlidingWindows windows;
+private final GroupedStreamAggregateBuilder aggregateBuilder;
+
+SlidingWindowedKStreamImpl(final SlidingWindows windows,
+   final InternalStreamsBuilder builder,
+   final Set subTopologySourceNodes,
+   final String name,
+   final Serde keySerde,
+   final Serde valueSerde,
+   final GroupedStreamAggregateBuilder 
aggregateBuilder,
+   final StreamsGraphNode streamsGraphNode) {
+super(name, keySerde, valueSerde, subTopologySourceNodes, 
streamsGraphNode, builder);
+this.windows = Objects.requireNonNull(windows, "windows can't be 
null");
+this.aggregateBuilder = aggregateBuilder;
+}
+
+@Override
+public KTable, Long> count() {
+return count(NamedInternal.empty());
+}
+
+@Override
+public KTable, Long> count(final Named named) {
+return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+}
+
+@Override
+public KTable, Long> count(final Materialized> materialized) {
+return count(NamedInternal.empty(), materialized);
+}
+
+@Override
+public KTable, Long> count(final Named named, final 
Materialized> materialized) {
+Objects.requireNonNull(materialized, "materialized can't be null");
+return doCount(named, materialized);
+}
+
+private KTable, Long> doCount(final Named named,
+  final Materialized> materialized) {
+final MaterializedInternal> 
materializedInternal =
+new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+
+if (materializedInternal.keySerde() == null) {
+materializedInternal.withKeySerde(keySerde);
+}
+if (materializedInternal.valueSerde() == null) {
+materializedInternal.withValueSerde(Serdes.Long());
+}
+
+final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+return aggregateBuilder.build(
+new NamedInternal(aggregateName),
+materialize(materializedInternal),
+new KStreamSlidingWindowAggregate<>(windows, 
materializedInternal.storeName(), aggregateBuilder.countInitializer, 
aggregateBuilder.countAggregator),
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465094963



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception {
 )));
 }
 
+
+
+@Test
+public void shouldReduceSlidingWindows() throws Exception {
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+final long firstBatchTimestamp = 2000L;
+final long timeDifference = 1000L;
+produceMessages(firstBatchTimestamp);
+final long secondBatchTimestamp = firstBatchTimestamp + timeDifference 
/ 2;
+produceMessages(secondBatchTimestamp);
+final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference 
- 100L;
+produceMessages(thirdBatchTimestamp);
+
+final Serde> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+groupedStream
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
+.reduce(reducer)
+.toStream()
+.to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
+
+startStreams();
+
+final List, String>> windowedOutput 
= receiveMessages(
+new TimeWindowedDeserializer<>(),
+new StringDeserializer(),
+String.class,
+25);
+
+// read from ConsoleConsumer
+final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
+new TimeWindowedDeserializer(),
+new StringDeserializer(),
+String.class,
+25,
+true);
+
+final Comparator, String>> 
comparator =
+Comparator.comparing((KeyValueTimestamp, 
String> o) -> o.key().key())
+.thenComparing(KeyValueTimestamp::value);
+
+windowedOutput.sort(comparator);
+final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference;
+final long firstBatchRightWindow = firstBatchTimestamp + 1;
+final long secondBatchLeftWindow = secondBatchTimestamp - 
timeDifference;
+final long secondBatchRightWindow = secondBatchTimestamp + 1;
+final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
+
+final List, String>> expectResult = 
Arrays.asList(
+new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),

Review comment:
   Because the windows are sorted, the windows created by each record 
aren't consecutive, so I added comments describing each window, but only did it 
for A since all the other keys are processed the exact same way. Sample 
comment: `// A @ secondBatchTimestamp left window created when A @ 
secondBatchTimestamp processed`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465079294



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl extends AbstractStream 
implements TimeWindowedKStream {
+private final SlidingWindows windows;
+private final GroupedStreamAggregateBuilder aggregateBuilder;
+
+SlidingWindowedKStreamImpl(final SlidingWindows windows,
+   final InternalStreamsBuilder builder,
+   final Set subTopologySourceNodes,
+   final String name,
+   final Serde keySerde,
+   final Serde valueSerde,
+   final GroupedStreamAggregateBuilder 
aggregateBuilder,
+   final StreamsGraphNode streamsGraphNode) {
+super(name, keySerde, valueSerde, subTopologySourceNodes, 
streamsGraphNode, builder);
+this.windows = Objects.requireNonNull(windows, "windows can't be 
null");
+this.aggregateBuilder = aggregateBuilder;
+}
+
+@Override
+public KTable, Long> count() {
+return count(NamedInternal.empty());
+}
+
+@Override
+public KTable, Long> count(final Named named) {
+return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+}
+
+@Override
+public KTable, Long> count(final Materialized> materialized) {
+return count(NamedInternal.empty(), materialized);
+}
+
+@Override
+public KTable, Long> count(final Named named, final 
Materialized> materialized) {
+Objects.requireNonNull(materialized, "materialized can't be null");
+return doCount(named, materialized);
+}
+
+private KTable, Long> doCount(final Named named,
+  final Materialized> materialized) {
+final MaterializedInternal> 
materializedInternal =
+new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+
+if (materializedInternal.keySerde() == null) {
+materializedInternal.withKeySerde(keySerde);
+}
+if (materializedInternal.valueSerde() == null) {
+materializedInternal.withValueSerde(Serdes.Long());
+}
+
+final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+return aggregateBuilder.build(
+new NamedInternal(aggregateName),
+materialize(materializedInternal),
+new KStreamSlidingWindowAggregate<>(windows, 
materializedInternal.storeName(), aggregateBuilder.countInitializer, 
aggregateBuilder.countAggregator),
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-04 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465073439



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -132,16 +135,19 @@

 final boolean stateCreated,

 final StoreBuilder storeBuilder,

 final Windows windows,
+   
 final SlidingWindows slidingWindows,

 final SessionWindows sessionWindows,

 final Merger sessionMerger) {
 
 final ProcessorSupplier kStreamAggregate;
 
-if (windows == null && sessionWindows == null) {
+if (windows == null && slidingWindows == null && sessionWindows == 
null) {
 kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), 
initializer, aggregator);
-} else if (windows != null && sessionWindows == null) {
+} else if (windows != null && slidingWindows == null && sessionWindows 
== null) {
 kStreamAggregate = new KStreamWindowAggregate<>(windows, 
storeBuilder.name(), initializer, aggregator);
-} else if (windows == null && sessionMerger != null) {
+} else if (windows == null && slidingWindows != null && sessionWindows 
== null) {
+kStreamAggregate = new 
KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), 
initializer, aggregator);
+} else if (windows == null && slidingWindows == null && sessionMerger 
!= null) {

Review comment:
   Would checking both be redundant? It looks like the method that 
ultimately calls this one will check that sessionMerger is not null for session 
windows, so I think either both of these will be null or neither will be null





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463092579



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463091437



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463088582



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463088582



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463083821



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463083346



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463078058



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463077196



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463076565



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463073842



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463063406



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463056420



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463047237



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463046306



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+ 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463039985



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
##
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class SlidingWindowedKStreamImpl extends AbstractStream 
implements TimeWindowedKStream {
+private final SlidingWindows windows;
+private final GroupedStreamAggregateBuilder aggregateBuilder;
+
+SlidingWindowedKStreamImpl(final SlidingWindows windows,
+final InternalStreamsBuilder builder,
+final Set subTopologySourceNodes,
+final String name,
+final Serde keySerde,
+final Serde valueSerde,
+final GroupedStreamAggregateBuilder 
aggregateBuilder,
+final StreamsGraphNode streamsGraphNode) {
+super(name, keySerde, valueSerde, subTopologySourceNodes, 
streamsGraphNode, builder);
+this.windows = Objects.requireNonNull(windows, "windows can't be 
null");
+this.aggregateBuilder = aggregateBuilder;
+}
+
+@Override
+public KTable, Long> count() {
+return count(NamedInternal.empty());
+}
+
+@Override
+public KTable, Long> count(final Named named) {
+return doCount(named, Materialized.with(keySerde, Serdes.Long()));
+}
+
+
+@Override
+public KTable, Long> count(final Materialized> materialized) {
+return count(NamedInternal.empty(), materialized);
+}
+
+@Override
+public KTable, Long> count(final Named named, final 
Materialized> materialized) {
+Objects.requireNonNull(materialized, "materialized can't be null");
+
+// TODO: remove this when we do a topology-incompatible release
+// we used to burn a topology name here, so we have to keep doing it 
for compatibility
+if (new MaterializedInternal<>(materialized).storeName() == null) {
+builder.newStoreName(AGGREGATE_NAME);
+}
+
+return doCount(named, materialized);
+}
+
+private KTable, Long> doCount(final Named named,
+  final Materialized> materialized) {
+final MaterializedInternal> 
materializedInternal =
+new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+
+if (materializedInternal.keySerde() == null) {
+materializedInternal.withKeySerde(keySerde);
+}
+if (materializedInternal.valueSerde() == null) {
+materializedInternal.withValueSerde(Serdes.Long());
+}
+
+final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+
+return 

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463036048



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {

Review comment:
   done!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-07-30 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r463026843



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -132,16 +135,19 @@

 final boolean stateCreated,

 final StoreBuilder storeBuilder,

 final Windows windows,
+   
 final SlidingWindows slidingWindows,

 final SessionWindows sessionWindows,

 final Merger sessionMerger) {
 
 final ProcessorSupplier kStreamAggregate;
 
-if (windows == null && sessionWindows == null) {
+if (windows == null && slidingWindows == null && sessionWindows == 
null) {
 kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), 
initializer, aggregator);
-} else if (windows != null && sessionWindows == null) {
+} else if (windows != null && slidingWindows == null && sessionWindows 
== null) {
 kStreamAggregate = new KStreamWindowAggregate<>(windows, 
storeBuilder.name(), initializer, aggregator);
-} else if (windows == null && sessionMerger != null) {
+} else if (windows == null && slidingWindows != null && sessionWindows 
== null) {
+kStreamAggregate = new 
KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), 
initializer, aggregator);
+} else if (windows == null && slidingWindows == null && sessionMerger 
!= null) {

Review comment:
   The original just had the check for `sessionMerger != null`, are there 
scenarios where the sessionMerger would be null but the sessionWindows 
wouldn't? I did think it was kind of inconsistent to check 'sessionMerger' just 
that one time and check 'sessionWindows' the other times so maybe it was a 
mistake





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org