Repository: beam Updated Branches: refs/heads/master d84b06791 -> 3b3d6b81a
[BEAM-351] Add DisplayData to KafkaIO Changes after review. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52e2d3a7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52e2d3a7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52e2d3a7 Branch: refs/heads/master Commit: 52e2d3a77096460ae4a10ac977b4897a1eecf3a1 Parents: d84b067 Author: Aviem Zur <aviem...@gmail.com> Authored: Sun Feb 26 22:39:28 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 19:28:26 2017 +0200 ---------------------------------------------------------------------- sdks/java/io/kafka/pom.xml | 7 +++ .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 49 ++++++++++++++++++ .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 53 +++++++++++++++++++- 3 files changed, 108 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/52e2d3a7/sdks/java/io/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml index 6935f1e..d5ffe63 100644 --- a/sdks/java/io/kafka/pom.xml +++ b/sdks/java/io/kafka/pom.xml @@ -149,6 +149,13 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/52e2d3a7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 5fd34b9..890fb2b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Random; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -63,12 +64,14 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.ExposedByteArrayInputStream; import org.apache.beam.sdk.values.KV; @@ -500,6 +503,27 @@ public class KafkaIO { return new KafkaConsumer<>(config); } }; + + @SuppressWarnings("unchecked") + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + List<String> topics = getTopics(); + List<TopicPartition> topicPartitions = getTopicPartitions(); + if (topics.size() > 0) { + builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s")); + } else if (topicPartitions.size() > 0) { + builder.add(DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions)) + .withLabel("Topic Partition/s")); + } + Set<String> ignoredConsumerPropertiesKeys = IGNORED_CONSUMER_PROPERTIES.keySet(); + for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) { + String key = conf.getKey(); + if (!ignoredConsumerPropertiesKeys.contains(key)) { + builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue()))); + } + } + } } /** @@ -527,6 +551,12 @@ public class KafkaIO { } })); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + read.populateDisplayData(builder); + } } //////////////////////////////////////////////////////////////////////////////////////////////// @@ -1222,6 +1252,19 @@ public class KafkaIO { configForKeySerializer(), "Reserved for internal serializer", configForValueSerializer(), "Reserved for internal serializer" ); + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic")); + Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet(); + for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) { + String key = conf.getKey(); + if (!ignoredProducerPropertiesKeys.contains(key)) { + builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue()))); + } + } + } } /** @@ -1248,6 +1291,12 @@ public class KafkaIO { .setCoder(KvCoder.of(new NullOnlyCoder<K>(), kvWriteTransform.getValueCoder())) .apply(kvWriteTransform); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + kvWriteTransform.populateDisplayData(builder); + } } private static class NullOnlyCoder<T> extends AtomicCoder<T> { http://git-wip-us.apache.org/repos/asf/beam/blob/52e2d3a7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 9d7c27b..1897127 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -24,6 +25,7 @@ import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -58,6 +60,7 @@ import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -223,7 +226,7 @@ public class KafkaIOTest { List<String> topics = ImmutableList.of("topic_a", "topic_b"); KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read() - .withBootstrapServers("none") + .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn( topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions @@ -619,6 +622,54 @@ public class KafkaIOTest { } } + @Test + public void testSourceDisplayData() { + KafkaIO.Read<Integer, Long> read = mkKafkaReadTransform(10, null); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("topics", "topic_a,topic_b")); + assertThat(displayData, hasDisplayItem("enable.auto.commit", false)); + assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092")); + assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest")); + assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288)); + } + + @Test + public void testSourceWithExplicitPartitionsDisplayData() { + KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read() + .withBootstrapServers("myServer1:9092,myServer2:9092") + .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5), + new TopicPartition("test", 6))) + .withConsumerFactoryFn(new ConsumerFactoryFn( + Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions + .withKeyCoder(ByteArrayCoder.of()) + .withValueCoder(BigEndianLongCoder.of()); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6")); + assertThat(displayData, hasDisplayItem("enable.auto.commit", false)); + assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092")); + assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest")); + assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288)); + } + + @Test + public void testSinkDisplayData() { + KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write() + .withBootstrapServers("myServerA:9092,myServerB:9092") + .withTopic("myTopic") + .withValueCoder(BigEndianLongCoder.of()) + .withProducerFactoryFn(new ProducerFactoryFn()); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("topic", "myTopic")); + assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092")); + assertThat(displayData, hasDisplayItem("retries", 3)); + } + private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) { // verify that appropriate messages are written to kafka