Repository: beam Updated Branches: refs/heads/master 202aae9d3 -> 3d47b335c
[BEAM-2114] Fixed display data for Kafka read/write with coders Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10b3e3e7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10b3e3e7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10b3e3e7 Branch: refs/heads/master Commit: 10b3e3e7391603e00a64933fe74b7748b58bc590 Parents: 202aae9 Author: peay <p...@protonmail.com> Authored: Sat Apr 29 11:08:21 2017 +0200 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Sun Apr 30 09:39:45 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 9 +- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 102 +++++++++++++++++++ 2 files changed, 109 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/10b3e3e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 000df70..b3591ce 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -688,9 +688,11 @@ public class KafkaIO { */ private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead", - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead" + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead", // "group.id", "enable.auto.commit", "auto.commit.interval.ms" : // lets allow these, applications can have better resume point for restarts. + CoderBasedKafkaDeserializer.configForKeyDeserializer(), "Use readWithCoders instead", + CoderBasedKafkaDeserializer.configForValueDeserializer(), "Use readWithCoders instead" ); // set config defaults @@ -1526,7 +1528,10 @@ public class KafkaIO { */ private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead", - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead" + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead", + + CoderBasedKafkaSerializer.configForKeySerializer(), "Use writeWithCoders instead", + CoderBasedKafkaSerializer.configForValueSerializer(), "Use writeWithCoders instead" ); @Override http://git-wip-us.apache.org/repos/asf/beam/blob/10b3e3e7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index feb65da..a9c318d 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -266,6 +266,34 @@ public class KafkaIOTest { } } + /** + * Creates a consumer with two topics, with 5 partitions each. + * numElements are (round-robin) assigned all the 10 partitions. + */ + private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders( + int numElements, + @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) { + + List<String> topics = ImmutableList.of("topic_a", "topic_b"); + + KafkaIO.Read<Integer, Long> reader = KafkaIO + .<Integer, Long>readWithCoders(VarIntCoder.of(), VarLongCoder.of()) + .withBootstrapServers("myServer1:9092,myServer2:9092") + .withTopics(topics) + .withConsumerFactoryFn(new ConsumerFactoryFn( + topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions + .withKeyDeserializer(IntegerDeserializer.class) + .withValueDeserializer(LongDeserializer.class) + .withMaxNumRecords(numElements); + + if (timestampFn != null) { + return reader.withTimestampFn(timestampFn); + } else { + return reader; + } + } + + private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> { private final int num; @@ -316,6 +344,19 @@ public class KafkaIOTest { } @Test + public void testUnboundedSourceWithCoders() { + int numElements = 1000; + + PCollection<Long> input = p + .apply(mkKafkaReadTransformWithCoders(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply(Values.<Long>create()); + + addCountingAsserts(input, numElements); + p.run(); + } + + @Test public void testUnboundedSourceWithSingleTopic() { // same as testUnboundedSource, but with single topic @@ -667,6 +708,39 @@ public class KafkaIOTest { } @Test + public void testSinkWithCoders() throws Exception { + // Simply read from kafka source and write to kafka sink. Then verify the records + // are correctly published to mock kafka producer. + + int numElements = 1000; + + synchronized (MOCK_PRODUCER_LOCK) { + + MOCK_PRODUCER.clear(); + + ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start(); + + String topic = "test"; + + p + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply(KafkaIO.<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of()) + .withBootstrapServers("none") + .withTopic(topic) + .withKeySerializer(IntegerSerializer.class) + .withValueSerializer(LongSerializer.class) + .withProducerFactoryFn(new ProducerFactoryFn())); + + p.run(); + + completionThread.shutdown(); + + verifyProducerRecords(topic, numElements, false); + } + } + + @Test public void testValuesSink() throws Exception { // similar to testSink(), but use values()' interface. @@ -757,6 +831,19 @@ public class KafkaIOTest { } @Test + public void testSourceDisplayDataWithCoders() { + KafkaIO.Read<Integer, Long> read = mkKafkaReadTransformWithCoders(10, null); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("topics", "topic_a,topic_b")); + assertThat(displayData, hasDisplayItem("enable.auto.commit", false)); + assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092")); + assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest")); + assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288)); + } + + @Test public void testSourceWithExplicitPartitionsDisplayData() { KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read() .withBootstrapServers("myServer1:9092,myServer2:9092") @@ -790,6 +877,21 @@ public class KafkaIOTest { assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092")); assertThat(displayData, hasDisplayItem("retries", 3)); } + @Test + public void testSinkDisplayDataWithCoders() { + KafkaIO.Write<Integer, Long> write = KafkaIO + .<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of()) + .withBootstrapServers("myServerA:9092,myServerB:9092") + .withTopic("myTopic") + .withValueSerializer(LongSerializer.class) + .withProducerFactoryFn(new ProducerFactoryFn()); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("topic", "myTopic")); + assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092")); + assertThat(displayData, hasDisplayItem("retries", 3)); + } // interface for testing coder inference private interface DummyInterface<T> {