Repository: kafka Updated Branches: refs/heads/trunk a293e1dc0 -> efb060c57
KAFKA-5233; KIP-138: Change punctuate semantics Implementation for KIP-138: Change punctuate semantics Author: Michal Borowiecki <[email protected]> Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Eno Thereska <[email protected]>, Damian Guy <[email protected]> Closes #3055 from mihbor/KIP-138 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/efb060c5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/efb060c5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/efb060c5 Branch: refs/heads/trunk Commit: efb060c57f05d1d586bb14c016b0187c60f8e994 Parents: a293e1d Author: Michal Borowiecki <[email protected]> Authored: Wed Jun 28 11:26:02 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Wed Jun 28 11:26:02 2017 +0100 ---------------------------------------------------------------------- docs/streams.html | 15 +- .../wordcount/WordCountProcessorDemo.java | 36 ++-- .../kafka/streams/kstream/Transformer.java | 13 +- .../kafka/streams/kstream/ValueTransformer.java | 13 +- .../internals/KStreamTransformValues.java | 8 + .../kafka/streams/processor/Cancellable.java | 23 +++ .../kafka/streams/processor/Processor.java | 8 +- .../streams/processor/ProcessorContext.java | 28 ++- .../streams/processor/PunctuationType.java | 34 ++++ .../kafka/streams/processor/Punctuator.java | 26 +++ .../internals/GlobalProcessorContextImpl.java | 29 ++- .../internals/ProcessorContextImpl.java | 16 +- .../processor/internals/ProcessorNode.java | 18 +- .../internals/ProcessorNodePunctuator.java | 26 +++ .../processor/internals/PunctuationQueue.java | 18 +- .../internals/PunctuationSchedule.java | 55 +++++- .../streams/processor/internals/Punctuator.java | 23 --- .../processor/internals/StandbyContextImpl.java | 12 ++ .../streams/processor/internals/StreamTask.java | 48 +++-- .../processor/internals/StreamThread.java | 38 +++- .../internals/AbstractProcessorContextTest.java | 8 + .../internals/PunctuationQueueTest.java | 33 ++-- .../processor/internals/StreamTaskTest.java | 191 +++++++++++++++---- .../apache/kafka/test/MockProcessorContext.java | 7 + .../apache/kafka/test/MockProcessorNode.java | 12 +- .../kafka/test/MockProcessorSupplier.java | 50 +++-- .../apache/kafka/test/NoOpProcessorContext.java | 8 +- 27 files changed, 633 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/docs/streams.html ---------------------------------------------------------------------- diff --git a/docs/streams.html b/docs/streams.html index 625736e..f302add 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -319,13 +319,12 @@ </p> <p> - The <code>Processor</code> interface provides two main API methods: - <code>process</code> and <code>punctuate</code>. The <code>process</code> method is performed on each - of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time. - In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the - <code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to - forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current - processing progress (<code>context().commit</code>), etc. + The <code>Processor</code> interface provides one main API method, the <code>process</code> method, + which is performed on each of the received records. + In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the <code>init</code> method + and use the context to schedule a periodically called punctuation function (<code>context().schedule</code>), + to forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), + to commit the current processing progress (<code>context().commit</code>), etc. </p> <p> @@ -344,7 +343,7 @@ public class MyProcessor implements Processor<String, String> { this.context = context; // call this processor's punctuate() method every 1000 milliseconds. - this.context.schedule(1000); + this.context.schedule(1000, PunctuationType.STREAM_TIME, this::punctuate); // retrieve the key-value store named "Counts" this.kvStore = (KeyValueStore<String, Long>) context.getStateStore("Counts"); http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 4a990a6..eceddf0 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -24,6 +24,8 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -56,9 +58,24 @@ public class WordCountProcessorDemo { @Override @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { this.context = context; - this.context.schedule(1000); + this.context.schedule(1000, PunctuationType.STREAM_TIME, new Punctuator() { + @Override + public void punctuate(long timestamp) { + try (KeyValueIterator<String, Integer> iter = kvStore.all()) { + System.out.println("----------- " + timestamp + " ----------- "); + + while (iter.hasNext()) { + KeyValue<String, Integer> entry = iter.next(); + + System.out.println("[" + entry.key + ", " + entry.value + "]"); + + context.forward(entry.key, entry.value.toString()); + } + } + } + }); this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); } @@ -80,19 +97,8 @@ public class WordCountProcessorDemo { } @Override - public void punctuate(long timestamp) { - try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) { - System.out.println("----------- " + timestamp + " ----------- "); - - while (iter.hasNext()) { - KeyValue<String, Integer> entry = iter.next(); - - System.out.println("[" + entry.key + ", " + entry.value + "]"); - - context.forward(entry.key, entry.value.toString()); - } - } - } + @Deprecated + public void punctuate(long timestamp) {} @Override public void close() {} http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java index 7265a11..2eb4921 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -18,6 +18,8 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TimestampExtractor; @@ -27,8 +29,8 @@ import org.apache.kafka.streams.processor.TimestampExtractor; * This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for * each record of a stream and can access and modify a state that is available beyond a single call of * {@link #transform(Object, Object)} (cf. {@link KeyValueMapper} for stateless record transformation). - * Additionally, the interface can be called in regular intervals based on the processing progress - * (cf. {@link #punctuate(long)}. + * Additionally, this {@code Transformer} can {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} + * a method to be {@link Punctuator#punctuate(long) called periodically} with the provided context. * <p> * Use {@link TransformerSupplier} to provide new instances of {@code Transformer} to Kafka Stream's runtime. * <p> @@ -51,8 +53,8 @@ public interface Transformer<K, V, R> { * This is called once per instance when the topology gets initialized. * <p> * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to - * {@link ProcessorContext#schedule(long) schedule itself} for periodical calls (cf. {@link #punctuate(long)}), and - * to access attached {@link StateStore}s. + * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method to be + * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s. * <p> * Note, that {@link ProcessorContext} is updated in the background with the current record's meta data. * Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}. @@ -92,9 +94,12 @@ public interface Transformer<K, V, R> { * timestamps return by the used {@link TimestampExtractor}) * and not based on wall-clock time. * + * @deprecated Please use {@link Punctuator} functional interface instead. + * * @param timestamp the stream time when {@code punctuate} is being called * @return new {@link KeyValue} pair to be forwarded to down stream—if {@code null} will not be forwarded */ + @Deprecated R punctuate(final long timestamp); /** http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 0936e7a..5463a76 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TimestampExtractor; @@ -27,8 +29,8 @@ import org.apache.kafka.streams.processor.TimestampExtractor; * This is a stateful record-by-record operation, i.e, {@link #transform(Object)} is invoked individually for each * record of a stream and can access and modify a state that is available beyond a single call of * {@link #transform(Object)} (cf. {@link ValueMapper} for stateless value transformation). - * Additionally, the interface can be called in regular intervals based on the processing progress - * (cf. {@link #punctuate(long)}. + * Additionally, this {@code ValueTransformer} can {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} + * a method to be {@link Punctuator#punctuate(long) called periodically} with the provided context. * If {@code ValueTransformer} is applied to a {@link KeyValue} pair record the record's key is preserved. * <p> * Use {@link ValueTransformerSupplier} to provide new instances of {@code ValueTransformer} to Kafka Stream's runtime. @@ -48,8 +50,8 @@ public interface ValueTransformer<V, VR> { * This is called once per instance when the topology gets initialized. * <p> * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to - * {@link ProcessorContext#schedule(long) schedule itself} for periodical calls (cf. {@link #punctuate(long)}), and - * to access attached {@link StateStore}s. + * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method to be + * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s. * <p> * Note that {@link ProcessorContext} is updated in the background with the current record's meta data. * Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}. @@ -93,9 +95,12 @@ public interface ValueTransformer<V, VR> { * timestamps return by the used {@link TimestampExtractor}) * and not based on wall-clock time. * + * @deprecated Please use {@link Punctuator} functional interface instead. + * * @param timestamp the stream time when {@code punctuate} is being called * @return must return {@code null}—otherwise, an {@link StreamsException exception} will be thrown */ + @Deprecated VR punctuate(final long timestamp); /** http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java index 2e3211c..a6e9aaf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -21,9 +21,12 @@ import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; +import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -98,6 +101,11 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> } @Override + public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) { + return context.schedule(interval, type, callback); + } + + @Override public void schedule(final long interval) { context.schedule(interval); } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java new file mode 100644 index 0000000..82c9edd --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +public interface Cancellable { + + void cancel(); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java index 2aaf45e..2ed17df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java @@ -31,8 +31,9 @@ public interface Processor<K, V> { * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology * that contains it is initialized. * <p> - * If this processor is to be {@link #punctuate(long) called periodically} by the framework, then this method should - * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. + * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to + * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method to be + * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s. * * @param context the context; may not be null */ @@ -49,9 +50,12 @@ public interface Processor<K, V> { /** * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context * during {@link #init(ProcessorContext) initialization}. + * + * @deprecated Please use {@link Punctuator} functional interface instead. * * @param timestamp the stream time when this method is being called */ + @Deprecated void punctuate(long timestamp); /** http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 559e9f7..3468f1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -88,11 +88,37 @@ public interface ProcessorContext { /** * Schedules a periodic operation for processors. A processor may call this method during + * {@link Processor#init(ProcessorContext) initialization} or + * {@link Processor#process(Object, Object) processing} to + * schedule a periodic callback - called a punctuation - to {@link Punctuator#punctuate(long)}. + * The type parameter controls what notion of time is used for punctuation: + * <ul> + * <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced by the processing of messages + * in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use. + * <b>NOTE:</b> Only advanced if messages arrive</li> + * <li>{@link PunctuationType#SYSTEM_TIME} - uses system time (the wall-clock time), + * which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG}) + * independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited + * by how long an iteration of the processing loop takes to complete</li> + * </ul> + * + * @param interval the time interval between punctuations + * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#SYSTEM_TIME} + * @param callback a function consuming timestamps representing the current stream or system time + * @return a handle allowing cancellation of the punctuation schedule established by this method + */ + Cancellable schedule(long interval, PunctuationType type, Punctuator callback); + + /** + * Schedules a periodic operation for processors. A processor may call this method during * {@link Processor#init(ProcessorContext) initialization} to - * schedule a periodic call called a punctuation to {@link Processor#punctuate(long)}. + * schedule a periodic call - called a punctuation - to {@link Processor#punctuate(long)}. + * + * @deprecated Please use {@link #schedule(long, PunctuationType, Punctuator)} instead. * * @param interval the time interval between punctuations */ + @Deprecated void schedule(long interval); /** http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java b/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java new file mode 100644 index 0000000..4dd9300 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +/** + * Controls what notion of time is used for punctuation scheduled via {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)} schedule}: + * <ul> + * <li>STREAM_TIME - uses "stream time", which is advanced by the processing of messages + * in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use. + * <b>NOTE:</b> Only advanced if messages arrive</li> + * <li>SYSTEM_TIME - uses system time (the wall-clock time), + * which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG}) + * independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited + * by how long an iteration of the processing loop takes to complete</li> + * </ul> + */ +public enum PunctuationType { + STREAM_TIME, + SYSTEM_TIME, +} http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java new file mode 100644 index 0000000..200c1af --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +/** + * A functional interface used as an argument to {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)}. + */ +public interface Punctuator { + + void punctuate(long timestamp); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 34d0c35..4c1d350 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -19,6 +19,9 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -57,14 +60,22 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { } } + + /** + * @throws UnsupportedOperationException on every invocation + */ @Override public <K, V> void forward(K key, V value, int childIndex) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context."); } + + /** + * @throws UnsupportedOperationException on every invocation + */ @Override public <K, V> void forward(K key, V value, String childName) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context."); } @Override @@ -72,9 +83,21 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { //no-op } + /** + * @throws UnsupportedOperationException on every invocation + */ + @Override + public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) { + throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context."); + } + + + /** + * @throws UnsupportedOperationException on every invocation + */ @Override public void schedule(long interval) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context."); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 55cddcc..79c38b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -19,6 +19,9 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -122,8 +125,19 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } @Override + public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) { + return task.schedule(interval, type, callback); + } + + @Override + @Deprecated public void schedule(final long interval) { - task.schedule(interval); + schedule(interval, PunctuationType.STREAM_TIME, new Punctuator() { + @Override + public void punctuate(final long timestamp) { + currentNode().processor().punctuate(timestamp); + } + }); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 8112614..47f6311 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.Punctuator; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -65,14 +66,6 @@ public class ProcessorNode<K, V> { } }; - private long timestamp; - private Runnable punctuateDelegate = new Runnable() { - @Override - public void run() { - processor().punctuate(timestamp); - } - }; - public final Set<String> stateStores; public ProcessorNode(String name) { @@ -133,8 +126,13 @@ public class ProcessorNode<K, V> { this.nodeMetrics.metrics.measureLatencyNs(time, processDelegate, nodeMetrics.nodeProcessTimeSensor); } - public void punctuate(long timestamp) { - this.timestamp = timestamp; + public void punctuate(final long timestamp, final Punctuator punctuator) { + Runnable punctuateDelegate = new Runnable() { + @Override + public void run() { + punctuator.punctuate(timestamp); + } + }; this.nodeMetrics.metrics.measureLatencyNs(time, punctuateDelegate, nodeMetrics.nodePunctuateTimeSensor); } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java new file mode 100644 index 0000000..c80a3e8 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; + +public interface ProcessorNodePunctuator { + + void punctuate(ProcessorNode node, long streamTime, PunctuationType type, Punctuator punctuator); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java index 0f51852..ec047e6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java @@ -16,16 +16,20 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; + import java.util.PriorityQueue; public class PunctuationQueue { private final PriorityQueue<PunctuationSchedule> pq = new PriorityQueue<>(); - public void schedule(PunctuationSchedule sched) { + public Cancellable schedule(PunctuationSchedule sched) { synchronized (pq) { pq.add(sched); } + return sched.cancellable(); } public void close() { @@ -34,16 +38,20 @@ public class PunctuationQueue { } } - public boolean mayPunctuate(long timestamp, Punctuator punctuator) { + public boolean mayPunctuate(final long timestamp, final PunctuationType type, final ProcessorNodePunctuator processorNodePunctuator) { synchronized (pq) { boolean punctuated = false; PunctuationSchedule top = pq.peek(); while (top != null && top.timestamp <= timestamp) { PunctuationSchedule sched = top; pq.poll(); - punctuator.punctuate(sched.node(), timestamp); - pq.add(sched.next(timestamp)); - punctuated = true; + + if (!sched.isCancelled()) { + processorNodePunctuator.punctuate(sched.node(), timestamp, type, sched.punctuator()); + pq.add(sched.next(timestamp)); + punctuated = true; + } + top = pq.peek(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java index be792ba..cf50005 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java @@ -16,30 +16,73 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.Punctuator; + public class PunctuationSchedule extends Stamped<ProcessorNode> { - final long interval; + private final long interval; + private final Punctuator punctuator; + private boolean isCancelled = false; + // this Cancellable will be re-pointed at the successor schedule in next() + private final RepointableCancellable cancellable; - public PunctuationSchedule(ProcessorNode node, long interval) { - this(node, 0L, interval); + PunctuationSchedule(ProcessorNode node, long interval, Punctuator punctuator) { + this(node, 0L, interval, punctuator, new RepointableCancellable()); + cancellable.setSchedule(this); } - public PunctuationSchedule(ProcessorNode node, long time, long interval) { + private PunctuationSchedule(ProcessorNode node, long time, long interval, Punctuator punctuator, RepointableCancellable cancellable) { super(node, time); this.interval = interval; + this.punctuator = punctuator; + this.cancellable = cancellable; } public ProcessorNode node() { return value; } + public Punctuator punctuator() { + return punctuator; + } + + public Cancellable cancellable() { + return cancellable; + } + + void markCancelled() { + isCancelled = true; + } + + boolean isCancelled() { + return isCancelled; + } + public PunctuationSchedule next(long currTimestamp) { + PunctuationSchedule nextSchedule; // we need to special handle the case when it is firstly triggered (i.e. the timestamp // is equal to the interval) by reschedule based on the currTimestamp if (timestamp == 0L) - return new PunctuationSchedule(value, currTimestamp + interval, interval); + nextSchedule = new PunctuationSchedule(value, currTimestamp + interval, interval, punctuator, cancellable); else - return new PunctuationSchedule(value, timestamp + interval, interval); + nextSchedule = new PunctuationSchedule(value, timestamp + interval, interval, punctuator, cancellable); + + cancellable.setSchedule(nextSchedule); + + return nextSchedule; } + private static class RepointableCancellable implements Cancellable { + private PunctuationSchedule schedule; + + synchronized void setSchedule(PunctuationSchedule schedule) { + this.schedule = schedule; + } + + @Override + synchronized public void cancel() { + schedule.markCancelled(); + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java deleted file mode 100644 index 4bac97d..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -public interface Punctuator { - - void punctuate(ProcessorNode node, long streamTime); - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 0791c67..812a4ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -20,6 +20,9 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; @@ -156,6 +159,15 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle * @throws UnsupportedOperationException on every invocation */ @Override + public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) { + throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks."); + } + + /** + * @throws UnsupportedOperationException on every invocation + */ + @Override + @Deprecated public void schedule(final long interval) { throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks."); } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 55e1ffe..dfb28f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -29,7 +29,10 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -46,7 +49,7 @@ import static java.util.Collections.singleton; /** * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing. */ -public class StreamTask extends AbstractTask implements Punctuator { +public class StreamTask extends AbstractTask implements ProcessorNodePunctuator { private static final Logger log = LoggerFactory.getLogger(StreamTask.class); @@ -54,7 +57,8 @@ public class StreamTask extends AbstractTask implements Punctuator { private final PartitionGroup partitionGroup; private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo(); - private final PunctuationQueue punctuationQueue; + private final PunctuationQueue streamTimePunctuationQueue; + private final PunctuationQueue systemTimePunctuationQueue; private final Map<TopicPartition, Long> consumedOffsets; private final RecordCollector recordCollector; @@ -109,7 +113,8 @@ public class StreamTask extends AbstractTask implements Punctuator { final Time time, final Producer<byte[], byte[]> producer) { super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache, config); - punctuationQueue = new PunctuationQueue(); + streamTimePunctuationQueue = new PunctuationQueue(); + systemTimePunctuationQueue = new PunctuationQueue(); maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); this.metrics = new TaskMetrics(metrics); @@ -219,7 +224,7 @@ public class StreamTask extends AbstractTask implements Punctuator { * @throws IllegalStateException if the current node is not null */ @Override - public void punctuate(final ProcessorNode node, final long timestamp) { + public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) { if (processorContext.currentNode() != null) { throw new IllegalStateException(String.format("%s Current node is not null", logPrefix)); } @@ -227,11 +232,11 @@ public class StreamTask extends AbstractTask implements Punctuator { updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node); if (log.isTraceEnabled()) { - log.trace("{} Punctuating processor {} with timestamp {}", logPrefix, node.name(), timestamp); + log.trace("{} Punctuating processor {} with timestamp {} and punctuation type {}", logPrefix, node.name(), timestamp, type); } try { - node.punctuate(timestamp); + node.punctuate(timestamp, punctuator); } catch (final KafkaException e) { throw new StreamsException(String.format("%s Exception caught while punctuating processor '%s'", logPrefix, node.name()), e); } finally { @@ -484,14 +489,24 @@ public class StreamTask extends AbstractTask implements Punctuator { * Schedules a punctuation for the processor * * @param interval the interval in milliseconds + * @param type * @throws IllegalStateException if the current node is not null */ - public void schedule(final long interval) { + public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) { if (processorContext.currentNode() == null) { throw new IllegalStateException(String.format("%s Current node is null", logPrefix)); } - punctuationQueue.schedule(new PunctuationSchedule(processorContext.currentNode(), interval)); + final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), interval, punctuator); + + switch (type) { + case STREAM_TIME: + return streamTimePunctuationQueue.schedule(schedule); + case SYSTEM_TIME: + return systemTimePunctuationQueue.schedule(schedule); + default: + throw new IllegalArgumentException("Unrecognized PunctuationType: " + type); + } } /** @@ -502,10 +517,11 @@ public class StreamTask extends AbstractTask implements Punctuator { } /** - * Possibly trigger registered punctuation functions if + * Possibly trigger registered stream-time punctuation functions if * current partition group timestamp has reached the defined stamp + * Note, this is only called in the presence of new records */ - boolean maybePunctuate() { + boolean maybePunctuateStreamTime() { final long timestamp = partitionGroup.timestamp(); // if the timestamp is not known yet, meaning there is not enough data accumulated @@ -513,11 +529,21 @@ public class StreamTask extends AbstractTask implements Punctuator { if (timestamp == TimestampTracker.NOT_KNOWN) { return false; } else { - return punctuationQueue.mayPunctuate(timestamp, this); + return streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); } } /** + * Possibly trigger registered system-time punctuation functions if + * current system timestamp has reached the defined stamp + * Note, this is called irrespective of the presence of new records + */ + boolean maybePunctuateSystemTime() { + final long timestamp = time.milliseconds(); + + return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.SYSTEM_TIME, this); + } + /** * Request committing the current task's state */ void needCommit() { http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3fd7832..ae344b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -546,7 +546,7 @@ public class StreamThread extends Thread { if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) { streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); addRecordsToTasks(records); - final long totalProcessed = processAndPunctuate(activeTasks, recordsProcessedBeforeCommit); + final long totalProcessed = processAndPunctuateStreamTime(activeTasks, recordsProcessedBeforeCommit); if (totalProcessed > 0) { final long processLatency = computeLatency(); streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed, @@ -556,6 +556,7 @@ public class StreamThread extends Thread { } } + maybePunctuateSystemTime(); maybeCommit(timerStartedMs); maybeUpdateStandbyTasks(timerStartedMs); maybeClean(timerStartedMs); @@ -653,8 +654,8 @@ public class StreamThread extends Thread { * if UNLIMITED_RECORDS, then commit is never called * @return Number of records processed since last commit. */ - private long processAndPunctuate(final Map<TaskId, StreamTask> tasks, - final long recordsProcessedBeforeCommit) { + private long processAndPunctuateStreamTime(final Map<TaskId, StreamTask> tasks, + final long recordsProcessedBeforeCommit) { long totalProcessedEachRound; long totalProcessedSinceLastMaybeCommit = 0; @@ -699,7 +700,7 @@ public class StreamThread extends Thread { @Override public void apply(final StreamTask task) { name = "punctuate"; - maybePunctuate(task); + maybePunctuateStreamTime(task); if (task.commitNeeded()) { name = "commit"; @@ -721,11 +722,11 @@ public class StreamThread extends Thread { return totalProcessedSinceLastMaybeCommit; } - private void maybePunctuate(final StreamTask task) { + private void maybePunctuateStreamTime(final StreamTask task) { try { // check whether we should punctuate based on the task's partition group timestamp; // which are essentially based on record timestamp. - if (task.maybePunctuate()) { + if (task.maybePunctuateStreamTime()) { streamsMetrics.punctuateTimeSensor.record(computeLatency(), timerStartedMs); } } catch (final KafkaException e) { @@ -734,6 +735,31 @@ public class StreamThread extends Thread { } } + private void maybePunctuateSystemTime() { + final RuntimeException e = performOnStreamTasks(new StreamTaskAction() { + @Override + public String name() { + return "punctuate"; + } + + @Override + public void apply(final StreamTask task) { + try { + // check whether we should punctuate based on system timestamp + if (task.maybePunctuateSystemTime()) { + streamsMetrics.punctuateTimeSensor.record(computeLatency(), timerStartedMs); + } + } catch (final KafkaException e) { + log.error("{} Failed to punctuate active task {}: {}", logPrefix, task.id(), e); + throw e; + } + } + }); + if (e != null) { + throw e; + } + } + /** * Adjust the number of records that should be processed by scheduler. This avoids * scenarios where the processing time is higher than the commit time. http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 6adaa42..e0eed76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -18,6 +18,9 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -145,6 +148,11 @@ public class AbstractProcessorContextTest { } @Override + public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) { + return null; + } + + @Override public void schedule(final long interval) { } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java index a23ff75..1570c9b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java @@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.junit.Test; import java.util.ArrayList; @@ -28,34 +30,41 @@ public class PunctuationQueueTest { @Test public void testPunctuationInterval() { - TestProcessor processor = new TestProcessor(); - ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null); - PunctuationQueue queue = new PunctuationQueue(); + final TestProcessor processor = new TestProcessor(); + final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null); + final PunctuationQueue queue = new PunctuationQueue(); + final Punctuator punctuator = new Punctuator() { + @Override + public void punctuate(long timestamp) { + node.processor().punctuate(timestamp); + } + }; - PunctuationSchedule sched = new PunctuationSchedule(node, 100L); + final PunctuationSchedule sched = new PunctuationSchedule(node, 100L, punctuator); final long now = sched.timestamp - 100L; queue.schedule(sched); - Punctuator punctuator = new Punctuator() { - public void punctuate(ProcessorNode node, long time) { - node.processor().punctuate(time); + ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() { + @Override + public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator punctuator) { + punctuator.punctuate(time); } }; - queue.mayPunctuate(now, punctuator); + queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator); assertEquals(0, processor.punctuatedAt.size()); - queue.mayPunctuate(now + 99L, punctuator); + queue.mayPunctuate(now + 99L, PunctuationType.STREAM_TIME, processorNodePunctuator); assertEquals(0, processor.punctuatedAt.size()); - queue.mayPunctuate(now + 100L, punctuator); + queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator); assertEquals(1, processor.punctuatedAt.size()); - queue.mayPunctuate(now + 199L, punctuator); + queue.mayPunctuate(now + 199L, PunctuationType.STREAM_TIME, processorNodePunctuator); assertEquals(1, processor.punctuatedAt.size()); - queue.mayPunctuate(now + 200L, punctuator); + queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator); assertEquals(2, processor.punctuatedAt.size()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 130783d..1a0bebe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -38,7 +38,10 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; @@ -86,10 +89,11 @@ public class StreamTaskTest { private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(topic1, intDeserializer, intDeserializer); private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(topic2, intDeserializer, intDeserializer); - private final MockProcessorNode<Integer, Integer> processor = new MockProcessorNode<>(10L); + private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode<>(10L); + private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.SYSTEM_TIME); private final ProcessorTopology topology = new ProcessorTopology( - Arrays.<ProcessorNode>asList(source1, source2, processor), + Arrays.<ProcessorNode>asList(source1, source2, processorStreamTime, processorSystemTime), new HashMap<String, SourceNode>() { { put(topic1[0], source1); @@ -117,6 +121,14 @@ public class StreamTaskTest { private StreamsConfig config; private StreamsConfig eosConfig; private StreamTask task; + private long punctuatedAt; + + private Punctuator punctuator = new Punctuator() { + @Override + public void punctuate(long timestamp) { + punctuatedAt = timestamp; + } + }; private StreamsConfig createConfig(final boolean enableEoS) throws Exception { return new StreamsConfig(new Properties() { @@ -133,14 +145,13 @@ public class StreamTaskTest { }); } - - - @Before public void setup() throws Exception { consumer.assign(Arrays.asList(partition1, partition2)); - source1.addChild(processor); - source2.addChild(processor); + source1.addChild(processorStreamTime); + source2.addChild(processorStreamTime); + source1.addChild(processorSystemTime); + source2.addChild(processorSystemTime); config = createConfig(false); eosConfig = createConfig(true); stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime()); @@ -282,7 +293,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void testMaybePunctuate() throws Exception { + public void testMaybePunctuateStreamTime() throws Exception { task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -295,42 +306,42 @@ public class StreamTaskTest { new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); - assertTrue(task.maybePunctuate()); + assertTrue(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(5, task.numBuffered()); assertEquals(1, source1.numReceived); assertEquals(0, source2.numReceived); - assertFalse(task.maybePunctuate()); + assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(4, task.numBuffered()); assertEquals(1, source1.numReceived); assertEquals(1, source2.numReceived); - assertTrue(task.maybePunctuate()); + assertTrue(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(3, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(1, source2.numReceived); - assertFalse(task.maybePunctuate()); + assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(2, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(2, source2.numReceived); - assertTrue(task.maybePunctuate()); + assertTrue(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(1, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(2, source2.numReceived); - assertFalse(task.maybePunctuate()); + assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(0, task.numBuffered()); @@ -338,9 +349,71 @@ public class StreamTaskTest { assertEquals(3, source2.numReceived); assertFalse(task.process()); - assertFalse(task.maybePunctuate()); + assertFalse(task.maybePunctuateStreamTime()); + + processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 30L, 40L); + } + + @SuppressWarnings("unchecked") + @Test + public void testCancelPunctuateStreamTime() throws Exception { + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + )); + + task.addRecords(partition2, records( + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + )); + + assertTrue(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + + assertFalse(task.maybePunctuateStreamTime()); + + assertTrue(task.process()); + + processorStreamTime.supplier.scheduleCancellable.cancel(); + + assertFalse(task.maybePunctuateStreamTime()); + + processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L); + } + + @Test + public void shouldPunctuateSystemTimeWhenIntervalElapsed() throws Exception { + long now = time.milliseconds(); + time.sleep(10); + assertTrue(task.maybePunctuateSystemTime()); + time.sleep(10); + assertTrue(task.maybePunctuateSystemTime()); + time.sleep(10); + assertTrue(task.maybePunctuateSystemTime()); + processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now + 10, now + 20, now + 30); + } + + @Test + public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() throws Exception { + long now = time.milliseconds(); + assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate + time.sleep(9); + assertFalse(task.maybePunctuateSystemTime()); + processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now); + } - processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L); + @Test + public void testCancelPunctuateSystemTime() throws Exception { + long now = time.milliseconds(); + time.sleep(10); + assertTrue(task.maybePunctuateSystemTime()); + processorSystemTime.supplier.scheduleCancellable.cancel(); + time.sleep(10); + assertFalse(task.maybePunctuateSystemTime()); + processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now + 10); } @SuppressWarnings("unchecked") @@ -388,10 +461,10 @@ public class StreamTaskTest { } } - @SuppressWarnings("unchecked") + @SuppressWarnings(value = {"unchecked", "deprecation"}) @Test - public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating() throws Exception { - final ProcessorNode punctuator = new ProcessorNode("test", new AbstractProcessor() { + public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingDeprecated() throws Exception { + final Processor processor = new AbstractProcessor() { @Override public void init(final ProcessorContext context) { context.schedule(1); @@ -404,11 +477,51 @@ public class StreamTaskTest { public void punctuate(final long timestamp) { throw new KafkaException("KABOOM!"); } - }, Collections.<String>emptySet()); + }; + + final ProcessorNode punctuator = new ProcessorNode("test", processor, Collections.<String>emptySet()); + punctuator.init(new NoOpProcessorContext()); + + try { + task.punctuate(punctuator, 1, PunctuationType.STREAM_TIME, new Punctuator() { + @Override + public void punctuate(long timestamp) { + processor.punctuate(timestamp); + } + }); + fail("Should've thrown StreamsException"); + } catch (final StreamsException e) { + final String message = e.getMessage(); + assertTrue("message=" + message + " should contain processor", message.contains("processor 'test'")); + assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() throws Exception { + final Processor processor = new AbstractProcessor() { + @Override + public void init(final ProcessorContext context) { + } + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void punctuate(final long timestamp) {} + }; + + final ProcessorNode punctuator = new ProcessorNode("test", processor, Collections.<String>emptySet()); punctuator.init(new NoOpProcessorContext()); try { - task.punctuate(punctuator, 1); + task.punctuate(punctuator, 1, PunctuationType.STREAM_TIME, new Punctuator() { + @Override + public void punctuate(long timestamp) { + throw new KafkaException("KABOOM!"); + } + }); fail("Should've thrown StreamsException"); } catch (final StreamsException e) { final String message = e.getMessage(); @@ -567,9 +680,9 @@ public class StreamTaskTest { @Test public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() throws Exception { - ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor); + ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime); try { - task.punctuate(processor, 10); + task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator); fail("Should throw illegal state exception as current node is not null"); } catch (final IllegalStateException e) { // pass @@ -578,27 +691,37 @@ public class StreamTaskTest { @Test public void shouldCallPunctuateOnPassedInProcessorNode() throws Exception { - task.punctuate(processor, 5); - assertThat(processor.punctuatedAt, equalTo(5L)); - task.punctuate(processor, 10); - assertThat(processor.punctuatedAt, equalTo(10L)); + task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator); + assertThat(punctuatedAt, equalTo(5L)); + task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator); + assertThat(punctuatedAt, equalTo(10L)); } @Test public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() throws Exception { - task.punctuate(processor, 5); + task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator); assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue()); } @Test(expected = IllegalStateException.class) public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() throws Exception { - task.schedule(1); + task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() { + @Override + public void punctuate(long timestamp) { + // no-op + } + }); } @Test - public void shouldNotThrowIExceptionOnScheduleIfCurrentNodeIsNotNull() throws Exception { - ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor); - task.schedule(1); + public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() throws Exception { + ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime); + task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() { + @Override + public void punctuate(long timestamp) { + // no-op + } + }); } @SuppressWarnings("unchecked") @@ -612,7 +735,7 @@ public class StreamTaskTest { } catch (final RuntimeException e) { task = null; } - assertTrue(processor.closed); + assertTrue(processorStreamTime.closed); assertTrue(source1.closed); assertTrue(source2.closed); } @@ -780,7 +903,7 @@ public class StreamTaskTest { throw new RuntimeException("KABOOM!"); } }; - final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processor, source1, source2); + final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processorStreamTime, source1, source2); final Map<String, SourceNode> sourceNodes = new HashMap() { { put(topic1[0], processorNode); http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index cb56fa1..515d35d 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -24,6 +24,9 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -167,6 +170,10 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol return storeMap.get(name); } + @Override public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) { + throw new UnsupportedOperationException("schedule() not supported."); + } + @Override public void schedule(final long interval) { throw new UnsupportedOperationException("schedule() not supported."); http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java index 2fe44f0..38b0e7d 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java @@ -17,6 +17,8 @@ package org.apache.kafka.test; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.internals.ProcessorNode; import java.util.Collections; @@ -33,7 +35,11 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> { public boolean initialized; public MockProcessorNode(long scheduleInterval) { - this(new MockProcessorSupplier<K, V>(scheduleInterval)); + this(scheduleInterval, PunctuationType.STREAM_TIME); + } + + public MockProcessorNode(long scheduleInterval, PunctuationType punctuationType) { + this(new MockProcessorSupplier<K, V>(scheduleInterval, punctuationType)); } private MockProcessorNode(MockProcessorSupplier<K, V> supplier) { @@ -54,8 +60,8 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> { } @Override - public void punctuate(final long timestamp) { - super.punctuate(timestamp); + public void punctuate(final long timestamp, final Punctuator punctuator) { + super.punctuate(timestamp, punctuator); this.punctuatedAt = timestamp; } http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java index 571e084..c464aaa 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java @@ -17,9 +17,12 @@ package org.apache.kafka.test; import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import java.util.ArrayList; @@ -28,30 +31,57 @@ import static org.junit.Assert.assertEquals; public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> { public final ArrayList<String> processed = new ArrayList<>(); - public final ArrayList<Long> punctuated = new ArrayList<>(); + public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>(); + public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>(); private final long scheduleInterval; + private final PunctuationType punctuationType; + public Cancellable scheduleCancellable; public MockProcessorSupplier() { this(-1L); } public MockProcessorSupplier(long scheduleInterval) { + this(scheduleInterval, PunctuationType.STREAM_TIME); + } + + public MockProcessorSupplier(long scheduleInterval, PunctuationType punctuationType) { this.scheduleInterval = scheduleInterval; + this.punctuationType = punctuationType; } @Override public Processor<K, V> get() { - return new MockProcessor(); + return new MockProcessor(punctuationType); } public class MockProcessor extends AbstractProcessor<K, V> { + PunctuationType punctuationType; + + public MockProcessor(PunctuationType punctuationType) { + this.punctuationType = punctuationType; + } + @Override public void init(ProcessorContext context) { super.init(context); - if (scheduleInterval > 0L) - context.schedule(scheduleInterval); + if (scheduleInterval > 0L) { + scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() { + @Override + public void punctuate(long timestamp) { + if (punctuationType == PunctuationType.STREAM_TIME) { + assertEquals(timestamp, context().timestamp()); + } + assertEquals(-1, context().partition()); + assertEquals(-1L, context().offset()); + + (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime) + .add(timestamp); + } + }); + } } @Override @@ -60,15 +90,6 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> { (value == null ? "null" : value)); } - - @Override - public void punctuate(long streamTime) { - assertEquals(streamTime, context().timestamp()); - assertEquals(-1, context().partition()); - assertEquals(-1L, context().offset()); - - punctuated.add(streamTime); - } } public void checkAndClearProcessResult(String... expected) { @@ -86,7 +107,8 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> { processed.clear(); } - public void checkAndClearPunctuateResult(long... expected) { + public void checkAndClearPunctuateResult(PunctuationType type, long... expected) { + ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime; assertEquals("the number of outputs:", expected.length, punctuated.size()); for (int i = 0; i < expected.length; i++) { http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 8e399c5..1b9cfed 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -18,6 +18,9 @@ package org.apache.kafka.test; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -48,9 +51,12 @@ public class NoOpProcessorContext extends AbstractProcessorContext { return null; } + @Override public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) { + return null; + } + @Override public void schedule(final long interval) { - } @Override
