Repository: beam
Updated Branches:
  refs/heads/master 202aae9d3 -> 3d47b335c


[BEAM-2114] Fixed display data for Kafka read/write with coders


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10b3e3e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10b3e3e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10b3e3e7

Branch: refs/heads/master
Commit: 10b3e3e7391603e00a64933fe74b7748b58bc590
Parents: 202aae9
Author: peay <p...@protonmail.com>
Authored: Sat Apr 29 11:08:21 2017 +0200
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Sun Apr 30 09:39:45 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |   9 +-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 102 +++++++++++++++++++
 2 files changed, 109 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/10b3e3e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 000df70..b3591ce 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -688,9 +688,11 @@ public class KafkaIO {
      */
     private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = 
ImmutableMap.of(
         ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer 
instead",
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer 
instead"
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer 
instead",
         // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
         //     lets allow these, applications can have better resume point for 
restarts.
+        CoderBasedKafkaDeserializer.configForKeyDeserializer(), "Use 
readWithCoders instead",
+        CoderBasedKafkaDeserializer.configForValueDeserializer(), "Use 
readWithCoders instead"
         );
 
     // set config defaults
@@ -1526,7 +1528,10 @@ public class KafkaIO {
      */
     private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = 
ImmutableMap.of(
         ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer 
instead",
-        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer 
instead"
+        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer 
instead",
+
+        CoderBasedKafkaSerializer.configForKeySerializer(), "Use 
writeWithCoders instead",
+        CoderBasedKafkaSerializer.configForValueSerializer(), "Use 
writeWithCoders instead"
      );
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/10b3e3e7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index feb65da..a9c318d 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -266,6 +266,34 @@ public class KafkaIOTest {
     }
   }
 
+  /**
+   * Creates a consumer with two topics, with 5 partitions each.
+   * numElements are (round-robin) assigned all the 10 partitions.
+   */
+  private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders(
+          int numElements,
+          @Nullable SerializableFunction<KV<Integer, Long>, Instant> 
timestampFn) {
+
+    List<String> topics = ImmutableList.of("topic_a", "topic_b");
+
+    KafkaIO.Read<Integer, Long> reader = KafkaIO
+            .<Integer, Long>readWithCoders(VarIntCoder.of(), VarLongCoder.of())
+            .withBootstrapServers("myServer1:9092,myServer2:9092")
+            .withTopics(topics)
+            .withConsumerFactoryFn(new ConsumerFactoryFn(
+                    topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 
20 partitions
+            .withKeyDeserializer(IntegerDeserializer.class)
+            .withValueDeserializer(LongDeserializer.class)
+            .withMaxNumRecords(numElements);
+
+    if (timestampFn != null) {
+      return reader.withTimestampFn(timestampFn);
+    } else {
+      return reader;
+    }
+  }
+
+
   private static class AssertMultipleOf implements 
SerializableFunction<Iterable<Long>, Void> {
     private final int num;
 
@@ -316,6 +344,19 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testUnboundedSourceWithCoders() {
+    int numElements = 1000;
+
+    PCollection<Long> input = p
+            .apply(mkKafkaReadTransformWithCoders(numElements, new 
ValueAsTimestampFn())
+                    .withoutMetadata())
+            .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
   public void testUnboundedSourceWithSingleTopic() {
     // same as testUnboundedSource, but with single topic
 
@@ -667,6 +708,39 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testSinkWithCoders() throws Exception {
+    // Simply read from kafka source and write to kafka sink. Then verify the 
records
+    // are correctly published to mock kafka producer.
+
+    int numElements = 1000;
+
+    synchronized (MOCK_PRODUCER_LOCK) {
+
+      MOCK_PRODUCER.clear();
+
+      ProducerSendCompletionThread completionThread = new 
ProducerSendCompletionThread().start();
+
+      String topic = "test";
+
+      p
+              .apply(mkKafkaReadTransform(numElements, new 
ValueAsTimestampFn())
+                      .withoutMetadata())
+              .apply(KafkaIO.<Integer, Long>writeWithCoders(VarIntCoder.of(), 
VarLongCoder.of())
+                      .withBootstrapServers("none")
+                      .withTopic(topic)
+                      .withKeySerializer(IntegerSerializer.class)
+                      .withValueSerializer(LongSerializer.class)
+                      .withProducerFactoryFn(new ProducerFactoryFn()));
+
+      p.run();
+
+      completionThread.shutdown();
+
+      verifyProducerRecords(topic, numElements, false);
+    }
+  }
+
+  @Test
   public void testValuesSink() throws Exception {
     // similar to testSink(), but use values()' interface.
 
@@ -757,6 +831,19 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testSourceDisplayDataWithCoders() {
+    KafkaIO.Read<Integer, Long> read = mkKafkaReadTransformWithCoders(10, 
null);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("topics", "topic_a,topic_b"));
+    assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
+    assertThat(displayData, hasDisplayItem("bootstrap.servers", 
"myServer1:9092,myServer2:9092"));
+    assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
+    assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
+  }
+
+  @Test
   public void testSourceWithExplicitPartitionsDisplayData() {
     KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
         .withBootstrapServers("myServer1:9092,myServer2:9092")
@@ -790,6 +877,21 @@ public class KafkaIOTest {
     assertThat(displayData, hasDisplayItem("bootstrap.servers", 
"myServerA:9092,myServerB:9092"));
     assertThat(displayData, hasDisplayItem("retries", 3));
   }
+  @Test
+  public void testSinkDisplayDataWithCoders() {
+    KafkaIO.Write<Integer, Long> write = KafkaIO
+            .<Integer, Long>writeWithCoders(VarIntCoder.of(), 
VarLongCoder.of())
+            .withBootstrapServers("myServerA:9092,myServerB:9092")
+            .withTopic("myTopic")
+            .withValueSerializer(LongSerializer.class)
+            .withProducerFactoryFn(new ProducerFactoryFn());
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("topic", "myTopic"));
+    assertThat(displayData, hasDisplayItem("bootstrap.servers", 
"myServerA:9092,myServerB:9092"));
+    assertThat(displayData, hasDisplayItem("retries", 3));
+  }
 
   // interface for testing coder inference
   private interface DummyInterface<T> {

Reply via email to