Repository: kafka Updated Branches: refs/heads/trunk 5fcc4584d -> cb725c6a2 (forced update)
KAFKA-4720: Add a KStream#peek(ForeachAction<K, V>) in DSL https://issues.apache.org/jira/browse/KAFKA-4720 Peek is a handy method to have to insert diagnostics that do not affect the stream itself, but some external state such as logging or metrics collection. Author: Steven Schlansker <[email protected]> Reviewers: Damian Guy, Matthias J. Sax, Eno Thereska, Guozhang Wang Closes #2493 from stevenschlansker/kafka-4720-peek Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cb725c6a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cb725c6a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cb725c6a Branch: refs/heads/trunk Commit: cb725c6a2d21651fe8b2e50225635209aa949bb5 Parents: af18248 Author: Steven Schlansker <[email protected]> Authored: Wed Feb 15 21:19:15 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Feb 15 21:20:49 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KStream.java | 14 ++++ .../streams/kstream/internals/KStreamImpl.java | 12 +++ .../streams/kstream/internals/KStreamPeek.java | 45 +++++++++++ .../kstream/internals/KStreamPeekTest.java | 85 ++++++++++++++++++++ 4 files changed, 156 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cb725c6a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 21135fb..64187e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -406,6 +406,20 @@ public interface KStream<K, V> { void foreach(final ForeachAction<? super K, ? super V> action); /** + * Perform an action on each record of {@code KStream}. + * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}). + * <p> + * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) + * and returns an unchanged stream. + * <p> + * Note that since this operation is stateless, it may execute multiple times for a single record in failure cases. + * + * @param action an action to perform on each record + * @see #process(ProcessorSupplier, String...) + */ + KStream<K, V> peek(final ForeachAction<? super K, ? super V> action); + + /** * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on * the supplied predicates. * Each record is evaluated against the supplied predicates, and predicates are evaluated in order. http://git-wip-us.apache.org/repos/asf/kafka/blob/cb725c6a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 0434f06..f325dcf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -57,6 +57,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public static final String FILTER_NAME = "KSTREAM-FILTER-"; + public static final String PEEK_NAME = "KSTREAM-PEEK-"; + private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-"; private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-"; @@ -318,6 +320,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override + public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) { + Objects.requireNonNull(action, "action can't be null"); + final String name = topology.newName(PEEK_NAME); + + topology.addProcessor(name, new KStreamPeek<>(action), this.name); + + return new KStreamImpl<>(topology, name, sourceNodes, repartitionRequired); + } + + @Override public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) { return through(keySerde, valSerde, null, topic); } http://git-wip-us.apache.org/repos/asf/kafka/blob/cb725c6a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java new file mode 100644 index 0000000..3dc0513 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +class KStreamPeek<K, V> implements ProcessorSupplier<K, V> { + + private final ForeachAction<K, V> action; + + public KStreamPeek(final ForeachAction<K, V> action) { + this.action = action; + } + + @Override + public Processor<K, V> get() { + return new KStreamPeekProcessor(); + } + + private class KStreamPeekProcessor extends AbstractProcessor<K, V> { + @Override + public void process(final K key, final V value) { + action.apply(key, value); + context().forward(key, value); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/cb725c6a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java new file mode 100644 index 0000000..48f4b65 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.test.KStreamTestDriver; +import org.junit.After; +import org.junit.Test; + +public class KStreamPeekTest { + + private final String topicName = "topic"; + + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + } + + @Test + public void shouldObserveStreamElements() { + final KStreamBuilder builder = new KStreamBuilder(); + final KStream<Integer, String> stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); + final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>(); + stream.peek(collect(peekObserved)).foreach(collect(streamObserved)); + + driver = new KStreamTestDriver(builder); + final List<KeyValue<Integer, String>> expected = new ArrayList<>(); + for (int key = 0; key < 32; key++) { + final String value = "V" + key; + driver.process(topicName, key, value); + expected.add(new KeyValue<>(key, value)); + } + + assertEquals(expected, peekObserved); + assertEquals(expected, streamObserved); + } + + @Test + public void shouldNotAllowNullAction() { + final KStreamBuilder builder = new KStreamBuilder(); + final KStream<Integer, String> stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); + try { + stream.peek(null); + fail("expected null action to throw NPE"); + } catch (NullPointerException expected) { } + } + + private static <K, V> ForeachAction<K, V> collect(final List<KeyValue<K, V>> into) { + return new ForeachAction<K, V>() { + @Override + public void apply(final K key, final V value) { + into.add(new KeyValue<>(key, value)); + } + }; + } +}
