This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a349e765ff9 Collect metric of message size distribution in KafkaIO 
read. (#29443)
a349e765ff9 is described below

commit a349e765ff999a8cbafa251fd257963bc01f6fb6
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Nov 21 11:00:32 2023 -0500

    Collect metric of message size distribution in KafkaIO read. (#29443)
    
    * Collect metric of message size distribution in KafkaIO read.
    
    Tests on the metric are also added for legacy code path
    (invoked by `ReadFromKafkaViaUnbounded`) and the SDF code path
    (invoked by `ReadFromKafkaViaSDF`) of kafkaIO read.
    
    * Consolidate kafkaio read metric name contants in one place
---
 .../beam/sdk/io/kafka/KafkaUnboundedReader.java    |  9 ++++
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       | 11 ++++
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 62 ++++++++++++++++++++++
 .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java   | 31 +++++++++++
 4 files changed, 113 insertions(+)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 6e6df42de8b..55c71384162 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
 import org.apache.beam.sdk.io.kafka.KafkaIO.Read;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Gauge;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.SourceMetrics;
@@ -217,6 +218,12 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
         pState.recordConsumed(offset, recordSize, offsetGap);
         bytesRead.inc(recordSize);
         bytesReadBySplit.inc(recordSize);
+
+        Distribution rawSizes =
+            Metrics.distribution(
+                METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + 
pState.topicPartition.toString());
+        rawSizes.update(recordSize);
+
         return true;
 
       } else { // -- (b)
@@ -309,6 +316,8 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
 
   @VisibleForTesting static final String METRIC_NAMESPACE = "KafkaIOReader";
 
+  @VisibleForTesting static final String RAW_SIZE_METRIC_PREFIX = "rawSize/";
+
   @VisibleForTesting
   static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC = 
"checkpointMarkCommitsEnqueued";
 
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 4b0035aa356..1b6e3addce2 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -31,6 +31,8 @@ import 
org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
 import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
 import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
@@ -203,6 +205,10 @@ abstract class ReadFromKafkaDoFn<K, V>
   @VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
   @VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
   @VisibleForTesting final Map<String, Object> consumerConfig;
+  @VisibleForTesting static final String METRIC_NAMESPACE = 
KafkaUnboundedReader.METRIC_NAMESPACE;
+
+  @VisibleForTesting
+  static final String RAW_SIZE_METRIC_PREFIX = 
KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX;
 
   /**
    * A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka 
{@link Consumer} to
@@ -362,6 +368,10 @@ abstract class ReadFromKafkaDoFn<K, V>
         Preconditions.checkStateNotNull(this.keyDeserializerInstance);
     final Deserializer<V> valueDeserializerInstance =
         Preconditions.checkStateNotNull(this.valueDeserializerInstance);
+    final Distribution rawSizes =
+        Metrics.distribution(
+            METRIC_NAMESPACE,
+            RAW_SIZE_METRIC_PREFIX + 
kafkaSourceDescriptor.getTopicPartition().toString());
     // Stop processing current TopicPartition when it's time to stop.
     if (checkStopReadingFn != null
         && 
checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) {
@@ -437,6 +447,7 @@ abstract class ReadFromKafkaDoFn<K, V>
           avgRecordSize
               .getUnchecked(kafkaSourceDescriptor.getTopicPartition())
               .update(recordSize, rawRecord.offset() - expectedOffset);
+          rawSizes.update(recordSize);
           expectedOffset = rawRecord.offset() + 1;
           Instant outputTimestamp;
           // The outputTimestamp and watermark will be computed by 
timestampPolicy, where the
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 52ab3e20f79..aeb5818e913 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -75,6 +75,7 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaIO.Read.FakeFlinkPipelineOptions;
 import org.apache.beam.sdk.io.kafka.KafkaMocks.PositionErrorConsumerFactory;
 import org.apache.beam.sdk.io.kafka.KafkaMocks.SendErrorProducerFactory;
+import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -1874,6 +1875,67 @@ public class KafkaIOTest {
     p.run();
   }
 
+  @Test
+  public void testUnboundedSourceRawSizeMetric() {
+    final String readStep = "readFromKafka";
+    final int numElements = 1000;
+    final int numPartitionsPerTopic = 10;
+    final int recordSize = 12; // The size of key and value is defined in 
ConsumerFactoryFn.
+
+    List<String> topics = ImmutableList.of("test");
+
+    KafkaIO.Read<byte[], Long> reader =
+        KafkaIO.<byte[], Long>read()
+            .withBootstrapServers("none")
+            .withTopicPartitions(
+                ImmutableList.of(new TopicPartition("test", 5), new 
TopicPartition("test", 8)))
+            .withConsumerFactoryFn(
+                new ConsumerFactoryFn(
+                    topics, numPartitionsPerTopic, numElements, 
OffsetResetStrategy.EARLIEST))
+            .withKeyDeserializer(ByteArrayDeserializer.class)
+            .withValueDeserializer(LongDeserializer.class)
+            .withMaxNumRecords(numElements / numPartitionsPerTopic * 2); // 2 
is the # of partitions
+
+    p.apply(readStep, reader.withoutMetadata()).apply(Values.create());
+
+    PipelineResult result = p.run();
+
+    MetricQueryResults metrics =
+        result
+            .metrics()
+            .queryMetrics(
+                MetricsFilter.builder()
+                    .addNameFilter(
+                        
MetricNameFilter.inNamespace(KafkaUnboundedReader.METRIC_NAMESPACE))
+                    .build());
+
+    assertThat(
+        metrics.getDistributions(),
+        hasItem(
+            attemptedMetricsResult(
+                KafkaUnboundedReader.METRIC_NAMESPACE,
+                KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX + "test-5",
+                readStep,
+                DistributionResult.create(
+                    recordSize * numElements / numPartitionsPerTopic,
+                    numElements / numPartitionsPerTopic,
+                    recordSize,
+                    recordSize))));
+
+    assertThat(
+        metrics.getDistributions(),
+        hasItem(
+            attemptedMetricsResult(
+                KafkaUnboundedReader.METRIC_NAMESPACE,
+                KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX + "test-8",
+                readStep,
+                DistributionResult.create(
+                    recordSize * numElements / numPartitionsPerTopic,
+                    numElements / numPartitionsPerTopic,
+                    recordSize,
+                    recordSize))));
+  }
+
   @Test
   public void testSourceDisplayData() {
     KafkaIO.Read<Integer, Long> read = mkKafkaReadTransform(10, null);
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 854fd5ecea6..554c6d2fcaf 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -27,12 +27,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.beam.runners.core.metrics.DistributionCell;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -394,6 +399,32 @@ public class ReadFromKafkaDoFnTest {
         createExpectedRecords(descriptor, startOffset, 3, "key", "value"), 
receiver.getOutputs());
   }
 
+  @Test
+  public void testRawSizeMetric() throws Exception {
+    final int numElements = 1000;
+    final int recordSize = 8; // The size of key and value is defined in 
SimpleMockKafkaConsumer.
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
+
+    MockOutputReceiver receiver = new MockOutputReceiver();
+    consumer.setNumOfRecordsPerPoll(numElements);
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0, 
numElements));
+    KafkaSourceDescriptor descriptor =
+        KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
+    ProcessContinuation result = dofnInstance.processElement(descriptor, 
tracker, null, receiver);
+    assertEquals(ProcessContinuation.stop(), result);
+
+    DistributionCell d =
+        container.getDistribution(
+            MetricName.named(
+                ReadFromKafkaDoFn.METRIC_NAMESPACE,
+                ReadFromKafkaDoFn.RAW_SIZE_METRIC_PREFIX + topicPartition));
+
+    assertEquals(
+        d.getCumulative(),
+        DistributionData.create(recordSize * numElements, numElements, 
recordSize, recordSize));
+  }
+
   @Test
   public void testProcessElementWithEmptyPoll() throws Exception {
     MockOutputReceiver receiver = new MockOutputReceiver();

Reply via email to