This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2aca624  MINOR: Avoid double null check in KStream#transform() (#6429)
2aca624 is described below

commit 2aca6241624f6b924b3e0164a9b7d021d80096b6
Author: cadonna <cado...@users.noreply.github.com>
AuthorDate: Tue Mar 12 19:05:53 2019 +0100

    MINOR: Avoid double null check in KStream#transform() (#6429)
    
    Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../streams/kstream/internals/KStreamImpl.java     | 27 +++++++++++++---------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0eda64f..856536c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -439,17 +439,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         builder.addGraphNode(this.streamsGraphNode, sinkNode);
     }
 
-    @Override
-    public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? 
super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
-                                              final String... stateStoreNames) 
{
-        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't 
be null");
-        return flatTransform(new 
TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
-    }
-
-    @Override
-    public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? 
super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
-                                                  final String... 
stateStoreNames) {
-        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't 
be null");
+    private <K1, V1> KStream<K1, V1> doFlatTransform(final 
TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> 
transformerSupplier,
+                                                     final String... 
stateStoreNames) {
         final String name = builder.newProcessorName(TRANSFORM_NAME);
         final StatefulProcessorNode<? super K, ? super V> transformNode = new 
StatefulProcessorNode<>(
             name,
@@ -465,6 +456,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
     @Override
+    public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? 
super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
+                                              final String... stateStoreNames) 
{
+        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't 
be null");
+        return doFlatTransform(new 
TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
+    }
+
+    @Override
+    public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? 
super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
+                                                  final String... 
stateStoreNames) {
+        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't 
be null");
+        return doFlatTransform(transformerSupplier, stateStoreNames);
+    }
+
+    @Override
     public <VR> KStream<K, VR> transformValues(final 
ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                                final String... 
stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, 
"valueTransformSupplier can't be null");

Reply via email to