[BEAM-2114] Throw instead of warning when KafkaIO cannot infer coder

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10fc5f86
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10fc5f86
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10fc5f86

Branch: refs/heads/master
Commit: 10fc5f86fac066423c77d9b6d9e7ed87ab32ef01
Parents: 10b3e3e
Author: peay <p...@protonmail.com>
Authored: Sat Apr 29 11:31:15 2017 +0200
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Sun Apr 30 09:42:52 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 18 +++++++++-----
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 26 +++++++++++++++++---
 2 files changed, 35 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b3591ce..8f94b8a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -291,17 +291,23 @@ public class KafkaIO {
       if (parameterizedType.getRawType() == Deserializer.class) {
         Type parameter = parameterizedType.getActualTypeArguments()[0];
 
+        @SuppressWarnings("unchecked")
+        Class<T> clazz = (Class<T>) parameter;
+
         try {
-          @SuppressWarnings("unchecked")
-          Class<T> clazz = (Class<T>) parameter;
           return NullableCoder.of(coderRegistry.getDefaultCoder(clazz));
         } catch (CannotProvideCoderException e) {
-          LOG.warn("Could not infer coder from deserializer type", e);
+          throw new RuntimeException(
+                  String.format("Unable to automatically infer a Coder for "
+                                + "the Kafka Deserializer %s: no coder 
registered for type %s",
+                                deserializer, clazz));
         }
       }
     }
 
-    throw new RuntimeException("Could not extract deserializer type from " + 
deserializer);
+    throw new RuntimeException(
+            String.format("Could not extract the Kafaka Deserializer type from 
%s",
+                          deserializer));
   }
 
   /**
@@ -634,14 +640,14 @@ public class KafkaIO {
       Coder<K> keyCoder =
           checkNotNull(
               getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, 
getKeyDeserializer()),
-              "Key coder must be set");
+              "Key coder must be inferable from input or set using 
readWithCoders");
 
       Coder<V> valueCoder =
           checkNotNull(
               getValueCoder() != null
                   ? getValueCoder()
                   : inferCoder(registry, getValueDeserializer()),
-              "Value coder must be set");
+              "Value coder must be inferable from input or set using 
readWithCoders");
 
       // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
       Unbounded<KafkaRecord<K, V>> unbounded =

http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index a9c318d..2f895fe 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -63,7 +63,6 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -605,7 +604,6 @@ public class KafkaIOTest {
   }
 
   @Test
-  @Category(NeedsRunner.class)
   public void testUnboundedSourceMetrics() {
     int numElements = 1000;
 
@@ -917,7 +915,24 @@ public class KafkaIOTest {
 
     @Override
     public void close() {
+    }
+  }
+
+  // class for testing coder inference
+  private static class ObjectDeserializer
+          implements Deserializer<Object> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
 
+    @Override
+    public Object deserialize(String topic, byte[] bytes) {
+      return new Object();
+    }
+
+    @Override
+    public void close() {
     }
   }
 
@@ -938,8 +953,13 @@ public class KafkaIOTest {
             instanceof VarLongCoder);
   }
 
+  @Test(expected = RuntimeException.class)
+  public void testInferKeyCoderFailure() {
+    CoderRegistry registry = CoderRegistry.createDefault();
+    KafkaIO.inferCoder(registry, ObjectDeserializer.class);
+  }
+
   @Test
-  @Category(NeedsRunner.class)
   public void testSinkMetrics() throws Exception {
     // Simply read from kafka source and write to kafka sink. Then verify the 
metrics are reported.
 

Reply via email to