KAFKA-3629; KStreamImpl.to(...) throws NPE when the value SerDe is null guozhangwang
Author: Damian Guy <[email protected]> Reviewers: Matthias J. Sax, Guozhang Wang Closes #1272 from dguy/kstreamimpl-to-npe and squashes the following commits: 49d48fb [Damian Guy] actually commit the fix 07ce589 [Damian Guy] fix npe in KStreamImpl.to(..) 74d396d [Damian Guy] fix npe in KStreamImpl.to(..) Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c76b5fa Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c76b5fa Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c76b5fa Branch: refs/heads/0.10.0 Commit: 4c76b5fa6a72412efa5936c284800148c2c69c24 Parents: 2885bc3 Author: Damian Guy <[email protected]> Authored: Wed Apr 27 10:50:20 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Apr 27 10:50:20 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/internals/KStreamImpl.java | 2 +- .../kafka/streams/kstream/internals/KStreamImplTest.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4c76b5fa/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index a84b4aa..91bcef9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -298,7 +298,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V String name = topology.newName(SINK_NAME); Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); - Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer(); + Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer(); if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; http://git-wip-us.apache.org/repos/asf/kafka/blob/4c76b5fa/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index b5c3d47..3d45d1d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -133,4 +133,11 @@ public class KStreamImplTest { 1, // process builder.build("X", null).processors().size()); } + + @Test + public void testToWithNullValueSerdeDoesntNPE() { + final KStreamBuilder builder = new KStreamBuilder(); + final KStream<String, String> inputStream = builder.stream(stringSerde, stringSerde, "input"); + inputStream.to(stringSerde, null, "output"); + } }
