KAFKA-3629; KStreamImpl.to(...) throws NPE when the value SerDe is null

guozhangwang

Author: Damian Guy <[email protected]>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #1272 from dguy/kstreamimpl-to-npe and squashes the following commits:

49d48fb [Damian Guy] actually commit the fix
07ce589 [Damian Guy] fix npe in KStreamImpl.to(..)
74d396d [Damian Guy] fix npe in KStreamImpl.to(..)


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c76b5fa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c76b5fa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c76b5fa

Branch: refs/heads/0.10.0
Commit: 4c76b5fa6a72412efa5936c284800148c2c69c24
Parents: 2885bc3
Author: Damian Guy <[email protected]>
Authored: Wed Apr 27 10:50:20 2016 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Wed Apr 27 10:50:20 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/internals/KStreamImpl.java   | 2 +-
 .../kafka/streams/kstream/internals/KStreamImplTest.java      | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4c76b5fa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index a84b4aa..91bcef9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -298,7 +298,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
         String name = topology.newName(SINK_NAME);
 
         Serializer<K> keySerializer = keySerde == null ? null : 
keySerde.serializer();
-        Serializer<V> valSerializer = keySerde == null ? null : 
valSerde.serializer();
+        Serializer<V> valSerializer = valSerde == null ? null : 
valSerde.serializer();
         
         if (partitioner == null && keySerializer != null && keySerializer 
instanceof WindowedSerializer) {
             WindowedSerializer<Object> windowedSerializer = 
(WindowedSerializer<Object>) keySerializer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c76b5fa/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index b5c3d47..3d45d1d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -133,4 +133,11 @@ public class KStreamImplTest {
             1, // process
             builder.build("X", null).processors().size());
     }
+
+    @Test
+    public void testToWithNullValueSerdeDoesntNPE() {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> inputStream = 
builder.stream(stringSerde, stringSerde, "input");
+        inputStream.to(stringSerde, null, "output");
+    }
 }

Reply via email to