This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a349e765ff9 Collect metric of message size distribution in KafkaIO
read. (#29443)
a349e765ff9 is described below
commit a349e765ff999a8cbafa251fd257963bc01f6fb6
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Nov 21 11:00:32 2023 -0500
Collect metric of message size distribution in KafkaIO read. (#29443)
* Collect metric of message size distribution in KafkaIO read.
Tests on the metric are also added for legacy code path
(invoked by `ReadFromKafkaViaUnbounded`) and the SDF code path
(invoked by `ReadFromKafkaViaSDF`) of kafkaIO read.
* Consolidate kafkaio read metric name contants in one place
---
.../beam/sdk/io/kafka/KafkaUnboundedReader.java | 9 ++++
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 11 ++++
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 62 ++++++++++++++++++++++
.../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 31 +++++++++++
4 files changed, 113 insertions(+)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 6e6df42de8b..55c71384162 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
import org.apache.beam.sdk.io.kafka.KafkaIO.Read;
import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
@@ -217,6 +218,12 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
pState.recordConsumed(offset, recordSize, offsetGap);
bytesRead.inc(recordSize);
bytesReadBySplit.inc(recordSize);
+
+ Distribution rawSizes =
+ Metrics.distribution(
+ METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX +
pState.topicPartition.toString());
+ rawSizes.update(recordSize);
+
return true;
} else { // -- (b)
@@ -309,6 +316,8 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
@VisibleForTesting static final String METRIC_NAMESPACE = "KafkaIOReader";
+ @VisibleForTesting static final String RAW_SIZE_METRIC_PREFIX = "rawSize/";
+
@VisibleForTesting
static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC =
"checkpointMarkCommitsEnqueued";
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 4b0035aa356..1b6e3addce2 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -31,6 +31,8 @@ import
org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
import
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
@@ -203,6 +205,10 @@ abstract class ReadFromKafkaDoFn<K, V>
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
@VisibleForTesting final Map<String, Object> consumerConfig;
+ @VisibleForTesting static final String METRIC_NAMESPACE =
KafkaUnboundedReader.METRIC_NAMESPACE;
+
+ @VisibleForTesting
+ static final String RAW_SIZE_METRIC_PREFIX =
KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX;
/**
* A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka
{@link Consumer} to
@@ -362,6 +368,10 @@ abstract class ReadFromKafkaDoFn<K, V>
Preconditions.checkStateNotNull(this.keyDeserializerInstance);
final Deserializer<V> valueDeserializerInstance =
Preconditions.checkStateNotNull(this.valueDeserializerInstance);
+ final Distribution rawSizes =
+ Metrics.distribution(
+ METRIC_NAMESPACE,
+ RAW_SIZE_METRIC_PREFIX +
kafkaSourceDescriptor.getTopicPartition().toString());
// Stop processing current TopicPartition when it's time to stop.
if (checkStopReadingFn != null
&&
checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) {
@@ -437,6 +447,7 @@ abstract class ReadFromKafkaDoFn<K, V>
avgRecordSize
.getUnchecked(kafkaSourceDescriptor.getTopicPartition())
.update(recordSize, rawRecord.offset() - expectedOffset);
+ rawSizes.update(recordSize);
expectedOffset = rawRecord.offset() + 1;
Instant outputTimestamp;
// The outputTimestamp and watermark will be computed by
timestampPolicy, where the
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 52ab3e20f79..aeb5818e913 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -75,6 +75,7 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.KafkaIO.Read.FakeFlinkPipelineOptions;
import org.apache.beam.sdk.io.kafka.KafkaMocks.PositionErrorConsumerFactory;
import org.apache.beam.sdk.io.kafka.KafkaMocks.SendErrorProducerFactory;
+import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -1874,6 +1875,67 @@ public class KafkaIOTest {
p.run();
}
+ @Test
+ public void testUnboundedSourceRawSizeMetric() {
+ final String readStep = "readFromKafka";
+ final int numElements = 1000;
+ final int numPartitionsPerTopic = 10;
+ final int recordSize = 12; // The size of key and value is defined in
ConsumerFactoryFn.
+
+ List<String> topics = ImmutableList.of("test");
+
+ KafkaIO.Read<byte[], Long> reader =
+ KafkaIO.<byte[], Long>read()
+ .withBootstrapServers("none")
+ .withTopicPartitions(
+ ImmutableList.of(new TopicPartition("test", 5), new
TopicPartition("test", 8)))
+ .withConsumerFactoryFn(
+ new ConsumerFactoryFn(
+ topics, numPartitionsPerTopic, numElements,
OffsetResetStrategy.EARLIEST))
+ .withKeyDeserializer(ByteArrayDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
+ .withMaxNumRecords(numElements / numPartitionsPerTopic * 2); // 2
is the # of partitions
+
+ p.apply(readStep, reader.withoutMetadata()).apply(Values.create());
+
+ PipelineResult result = p.run();
+
+ MetricQueryResults metrics =
+ result
+ .metrics()
+ .queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(
+
MetricNameFilter.inNamespace(KafkaUnboundedReader.METRIC_NAMESPACE))
+ .build());
+
+ assertThat(
+ metrics.getDistributions(),
+ hasItem(
+ attemptedMetricsResult(
+ KafkaUnboundedReader.METRIC_NAMESPACE,
+ KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX + "test-5",
+ readStep,
+ DistributionResult.create(
+ recordSize * numElements / numPartitionsPerTopic,
+ numElements / numPartitionsPerTopic,
+ recordSize,
+ recordSize))));
+
+ assertThat(
+ metrics.getDistributions(),
+ hasItem(
+ attemptedMetricsResult(
+ KafkaUnboundedReader.METRIC_NAMESPACE,
+ KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX + "test-8",
+ readStep,
+ DistributionResult.create(
+ recordSize * numElements / numPartitionsPerTopic,
+ numElements / numPartitionsPerTopic,
+ recordSize,
+ recordSize))));
+ }
+
@Test
public void testSourceDisplayData() {
KafkaIO.Read<Integer, Long> read = mkKafkaReadTransform(10, null);
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 854fd5ecea6..554c6d2fcaf 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -27,12 +27,17 @@ import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.beam.runners.core.metrics.DistributionCell;
+import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -394,6 +399,32 @@ public class ReadFromKafkaDoFnTest {
createExpectedRecords(descriptor, startOffset, 3, "key", "value"),
receiver.getOutputs());
}
+ @Test
+ public void testRawSizeMetric() throws Exception {
+ final int numElements = 1000;
+ final int recordSize = 8; // The size of key and value is defined in
SimpleMockKafkaConsumer.
+ MetricsContainerImpl container = new MetricsContainerImpl("any");
+ MetricsEnvironment.setCurrentContainer(container);
+
+ MockOutputReceiver receiver = new MockOutputReceiver();
+ consumer.setNumOfRecordsPerPoll(numElements);
+ OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0,
numElements));
+ KafkaSourceDescriptor descriptor =
+ KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
+ ProcessContinuation result = dofnInstance.processElement(descriptor,
tracker, null, receiver);
+ assertEquals(ProcessContinuation.stop(), result);
+
+ DistributionCell d =
+ container.getDistribution(
+ MetricName.named(
+ ReadFromKafkaDoFn.METRIC_NAMESPACE,
+ ReadFromKafkaDoFn.RAW_SIZE_METRIC_PREFIX + topicPartition));
+
+ assertEquals(
+ d.getCumulative(),
+ DistributionData.create(recordSize * numElements, numElements,
recordSize, recordSize));
+ }
+
@Test
public void testProcessElementWithEmptyPoll() throws Exception {
MockOutputReceiver receiver = new MockOutputReceiver();