This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2aca624 MINOR: Avoid double null check in KStream#transform() (#6429) 2aca624 is described below commit 2aca6241624f6b924b3e0164a9b7d021d80096b6 Author: cadonna <cado...@users.noreply.github.com> AuthorDate: Tue Mar 12 19:05:53 2019 +0100 MINOR: Avoid double null check in KStream#transform() (#6429) Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../streams/kstream/internals/KStreamImpl.java | 27 +++++++++++++--------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 0eda64f..856536c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -439,17 +439,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K builder.addGraphNode(this.streamsGraphNode, sinkNode); } - @Override - public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, - final String... stateStoreNames) { - Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); - return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames); - } - - @Override - public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, - final String... stateStoreNames) { - Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); + private <K1, V1> KStream<K1, V1> doFlatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, + final String... stateStoreNames) { final String name = builder.newProcessorName(TRANSFORM_NAME); final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>( name, @@ -465,6 +456,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override + public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, + final String... stateStoreNames) { + Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); + return doFlatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames); + } + + @Override + public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, + final String... stateStoreNames) { + Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); + return doFlatTransform(transformerSupplier, stateStoreNames); + } + + @Override public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames) { Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");