Repository: kafka Updated Branches: refs/heads/trunk c2a8b8611 -> b51002c57
KAFKA-4312: If filePath is empty string writeAsText should have more meaningful error message â¦eaningful error message Author: bbejeck <bbej...@gmail.com> Reviewers: Guozhang Wang <wangg...@gmail.com> Closes #2042 from bbejeck/KAFKA-4312_write_as_text_throws_NPE_empty_string Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b51002c5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b51002c5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b51002c5 Branch: refs/heads/trunk Commit: b51002c576ea9758132d75a8a0fe454e1bc270a2 Parents: c2a8b86 Author: Bill Bejeck <bbej...@gmail.com> Authored: Wed Oct 19 14:29:53 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Oct 19 14:29:53 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/internals/KStreamImpl.java | 3 +++ .../org/apache/kafka/streams/kstream/internals/KTableImpl.java | 4 ++++ .../kafka/streams/kstream/internals/KStreamImplTest.java | 6 ++++++ .../apache/kafka/streams/kstream/internals/KTableImplTest.java | 6 ++++++ 4 files changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index b6c3401..bb77e96 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -207,6 +207,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) { Objects.requireNonNull(filePath, "filePath can't be null"); + if (filePath.trim().isEmpty()) { + throw new TopologyBuilderException("filePath can't be an empty string"); + } String name = topology.newName(PRINTING_NAME); streamName = (streamName == null) ? this.name : streamName; try { http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index c53e761..fc1c076 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -189,6 +189,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, */ @Override public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) { + Objects.requireNonNull(filePath, "filePath can't be null"); + if (filePath.trim().isEmpty()) { + throw new TopologyBuilderException("filePath can't be an empty string"); + } String name = topology.newName(PRINTING_NAME); streamName = (streamName == null) ? this.name : streamName; try { http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index fb2afec..e5e334c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -183,6 +184,11 @@ public class KStreamImplTest { testStream.writeAsText(null); } + @Test(expected = TopologyBuilderException.class) + public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception { + testStream.writeAsText("\t \t"); + } + @Test(expected = NullPointerException.class) public void shouldNotAllowNullMapperOnFlatMap() throws Exception { testStream.flatMap(null); http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 4b9ea06..afa1033 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Predicate; @@ -402,6 +403,11 @@ public class KTableImplTest { table.writeAsText(null); } + @Test(expected = TopologyBuilderException.class) + public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception { + table.writeAsText("\t \t"); + } + @Test(expected = NullPointerException.class) public void shouldNotAllowNullActionOnForEach() throws Exception { table.foreach(null);