[BEAM-2114] Throw instead of warning when KafkaIO cannot infer coder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10fc5f86 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10fc5f86 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10fc5f86 Branch: refs/heads/master Commit: 10fc5f86fac066423c77d9b6d9e7ed87ab32ef01 Parents: 10b3e3e Author: peay <p...@protonmail.com> Authored: Sat Apr 29 11:31:15 2017 +0200 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Sun Apr 30 09:42:52 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 18 +++++++++----- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 26 +++++++++++++++++--- 2 files changed, 35 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index b3591ce..8f94b8a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -291,17 +291,23 @@ public class KafkaIO { if (parameterizedType.getRawType() == Deserializer.class) { Type parameter = parameterizedType.getActualTypeArguments()[0]; + @SuppressWarnings("unchecked") + Class<T> clazz = (Class<T>) parameter; + try { - @SuppressWarnings("unchecked") - Class<T> clazz = (Class<T>) parameter; return NullableCoder.of(coderRegistry.getDefaultCoder(clazz)); } catch (CannotProvideCoderException e) { - LOG.warn("Could not infer coder from deserializer type", e); + throw new RuntimeException( + String.format("Unable to automatically infer a Coder for " + + "the Kafka Deserializer %s: no coder registered for type %s", + deserializer, clazz)); } } } - throw new RuntimeException("Could not extract deserializer type from " + deserializer); + throw new RuntimeException( + String.format("Could not extract the Kafaka Deserializer type from %s", + deserializer)); } /** @@ -634,14 +640,14 @@ public class KafkaIO { Coder<K> keyCoder = checkNotNull( getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()), - "Key coder must be set"); + "Key coder must be inferable from input or set using readWithCoders"); Coder<V> valueCoder = checkNotNull( getValueCoder() != null ? getValueCoder() : inferCoder(registry, getValueDeserializer()), - "Value coder must be set"); + "Value coder must be inferable from input or set using readWithCoders"); // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. Unbounded<KafkaRecord<K, V>> unbounded = http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index a9c318d..2f895fe 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -63,7 +63,6 @@ import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; @@ -605,7 +604,6 @@ public class KafkaIOTest { } @Test - @Category(NeedsRunner.class) public void testUnboundedSourceMetrics() { int numElements = 1000; @@ -917,7 +915,24 @@ public class KafkaIOTest { @Override public void close() { + } + } + + // class for testing coder inference + private static class ObjectDeserializer + implements Deserializer<Object> { + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + } + @Override + public Object deserialize(String topic, byte[] bytes) { + return new Object(); + } + + @Override + public void close() { } } @@ -938,8 +953,13 @@ public class KafkaIOTest { instanceof VarLongCoder); } + @Test(expected = RuntimeException.class) + public void testInferKeyCoderFailure() { + CoderRegistry registry = CoderRegistry.createDefault(); + KafkaIO.inferCoder(registry, ObjectDeserializer.class); + } + @Test - @Category(NeedsRunner.class) public void testSinkMetrics() throws Exception { // Simply read from kafka source and write to kafka sink. Then verify the metrics are reported.