This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 5fd434a [BEAM-12193] Add user metrics to show founded TopicPartition new baa106e Merge pull request #14570 from [BEAM-12193] Add user metrics to show founded TopicPartition 5fd434a is described below commit 5fd434a4e18089148efa86e63573ee455d940be2 Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Fri Apr 16 21:30:50 2021 -0700 [BEAM-12193] Add user metrics to show founded TopicPartition --- .../apache/beam/sdk/io/kafka/TopicPartitionCoder.java | 3 ++- .../sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java | 16 ++++++++++++++-- .../beam/sdk/io/kafka/TopicPartitionCoderTest.java | 6 ++++++ .../sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java | 18 ++++++------------ 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java index f11e8ca..4868dc2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoder.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.kafka.common.TopicPartition; /** The {@link Coder} for encoding and decoding {@link TopicPartition} in Beam. */ @@ -48,7 +49,7 @@ public class TopicPartitionCoder extends StructuredCoder<TopicPartition> { @Override public List<? extends Coder<?>> getCoderArguments() { - return null; + return ImmutableList.of(StringUtf8Coder.of(), VarIntCoder.of()); } @Override diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java index d82bfcf..fc9cc62 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFn.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; @@ -62,6 +64,8 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD private final Map<String, Object> kafkaConsumerConfig; private final Instant startReadTime; + private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition"; + WatchKafkaTopicPartitionDoFn( Duration checkDuration, SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn, @@ -85,6 +89,7 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD @VisibleForTesting Set<TopicPartition> getAllTopicPartitions() { Set<TopicPartition> current = new HashSet<>(); + // TODO(BEAM-12192): Respect given topics from KafkaIO. try (Consumer<byte[], byte[]> kafkaConsumer = kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) { for (Map.Entry<String, List<PartitionInfo>> topicInfo : @@ -107,13 +112,17 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD current.forEach( topicPartition -> { if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) { + Counter foundedTopicPartition = + Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString()); + foundedTopicPartition.inc(); existingTopicPartitions.add(topicPartition); outputReceiver.output( KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null)); } }); - timer.set(Instant.now().plus(checkDuration.getMillis())); + timer.offset(checkDuration).setRelative(); + ; } @OnTimer(TIMER_ID) @@ -130,13 +139,16 @@ class WatchKafkaTopicPartitionDoFn extends DoFn<KV<byte[], byte[]>, KafkaSourceD }); existingTopicPartitions.clear(); - Set<TopicPartition> currentAll = getAllTopicPartitions(); + Set<TopicPartition> currentAll = this.getAllTopicPartitions(); // Emit new added TopicPartitions. Set<TopicPartition> newAdded = Sets.difference(currentAll, readingTopicPartitions); newAdded.forEach( topicPartition -> { if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) { + Counter foundedTopicPartition = + Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString()); + foundedTopicPartition.inc(); outputReceiver.output( KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null)); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java index 01c5acd..55cd957 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/TopicPartitionCoderTest.java @@ -36,4 +36,10 @@ public class TopicPartitionCoderTest { assertEquals( topicPartition, coder.decode(new ByteArrayInputStream(outputStream.toByteArray()))); } + + @Test + public void testToString() throws Exception { + TopicPartitionCoder coder = new TopicPartitionCoder(); + assertEquals("TopicPartitionCoder(StringUtf8Coder,VarIntCoder)", coder.toString()); + } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java index 14460d6..dc3d6b5 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchKafkaTopicPartitionDoFnTest.java @@ -99,12 +99,10 @@ public class WatchKafkaTopicPartitionDoFnTest { when(mockConsumer.listTopics()).thenReturn(ImmutableMap.of()); MockBagState bagState = new MockBagState(ImmutableList.of()); - Instant now = Instant.EPOCH; - mockStatic(Instant.class); - when(Instant.now()).thenReturn(now); + when(timer.offset(Duration.millis(600L))).thenReturn(timer); dofnInstance.processElement(timer, bagState, outputReceiver); - verify(timer, times(1)).set(now.plus(600L)); + verify(timer, times(1)).setRelative(); assertTrue(outputReceiver.getOutputs().isEmpty()); assertTrue(bagState.getCurrentStates().isEmpty()); } @@ -129,13 +127,11 @@ public class WatchKafkaTopicPartitionDoFnTest { new PartitionInfo("topic2", 0, null, null, null), new PartitionInfo("topic2", 1, null, null, null)))); MockBagState bagState = new MockBagState(ImmutableList.of()); - Instant now = Instant.EPOCH; - mockStatic(Instant.class); - when(Instant.now()).thenReturn(now); + when(timer.offset(Duration.millis(600L))).thenReturn(timer); dofnInstance.processElement(timer, bagState, outputReceiver); - verify(timer, times(1)).set(now.plus(600L)); + verify(timer, times(1)).setRelative(); Set<TopicPartition> expectedOutputTopicPartitions = ImmutableSet.of( new TopicPartition("topic1", 0), @@ -182,13 +178,11 @@ public class WatchKafkaTopicPartitionDoFnTest { new PartitionInfo("topic2", 0, null, null, null), new PartitionInfo("topic2", 1, null, null, null)))); MockBagState bagState = new MockBagState(ImmutableList.of()); - Instant now = Instant.EPOCH; - mockStatic(Instant.class); - when(Instant.now()).thenReturn(now); + when(timer.offset(Duration.millis(600L))).thenReturn(timer); dofnInstance.processElement(timer, bagState, outputReceiver); + verify(timer, times(1)).setRelative(); - verify(timer, times(1)).set(now.plus(600L)); Set<TopicPartition> expectedOutputTopicPartitions = ImmutableSet.of( new TopicPartition("topic1", 0),