This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4271fd8c8b1 KAFKA-19564: Close Consumer in ConsumerPerformance only
after metrics displayed (#20267)
4271fd8c8b1 is described below
commit 4271fd8c8b17cc7f31be027a7c25fd31ea11a7d7
Author: Kirk True <[email protected]>
AuthorDate: Wed Sep 3 01:25:21 2025 -0700
KAFKA-19564: Close Consumer in ConsumerPerformance only after metrics
displayed (#20267)
Ensure that metrics are retrieved and displayed (when requested) before
`Consumer.close()` is called. This is important because metrics are
technically supposed to be removed on `Consumer.close()`, which means
retrieving them _after_ `close()` would yield an empty map.
Reviewers: Andrew Schofield <[email protected]>
---
.../apache/kafka/tools/ConsumerPerformance.java | 84 +++++++++++-----------
.../kafka/tools/ConsumerPerformanceTest.java | 21 ++++++
2 files changed, 63 insertions(+), 42 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
index 0892693801a..62def15d324 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -16,13 +16,12 @@
*/
package org.apache.kafka.tools;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
@@ -38,11 +37,11 @@ import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import java.util.regex.Pattern;
import joptsimple.OptionException;
@@ -55,6 +54,10 @@ public class ConsumerPerformance {
private static final Random RND = new Random();
public static void main(String[] args) {
+ run(args, KafkaConsumer::new);
+ }
+
+ static void run(String[] args, Function<Properties, Consumer<byte[],
byte[]>> consumerCreator) {
try {
LOG.info("Starting consumer...");
ConsumerPerfOptions options = new ConsumerPerfOptions(args);
@@ -66,45 +69,42 @@ public class ConsumerPerformance {
if (!options.hideHeader())
printHeader(options.showDetailedStats());
- KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(options.props());
- long bytesRead = 0L;
- long messagesRead = 0L;
- long lastBytesRead = 0L;
- long lastMessagesRead = 0L;
- long currentTimeMs = System.currentTimeMillis();
- long joinStartMs = currentTimeMs;
- long startMs = currentTimeMs;
- consume(consumer, options, totalMessagesRead, totalBytesRead,
joinTimeMs,
- bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
- joinStartMs, joinTimeMsInSingleRound);
- long endMs = System.currentTimeMillis();
-
- Map<MetricName, ? extends Metric> metrics = null;
- if (options.printMetrics())
- metrics = consumer.metrics();
- consumer.close();
-
- // print final stats
- double elapsedSec = (endMs - startMs) / 1_000.0;
- long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get();
- if (!options.showDetailedStats()) {
- double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 *
1024);
- System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f,
%.4f%n",
- options.dateFormat().format(startMs),
- options.dateFormat().format(endMs),
- totalMbRead,
- totalMbRead / elapsedSec,
- totalMessagesRead.get(),
- totalMessagesRead.get() / elapsedSec,
- joinTimeMs.get(),
- fetchTimeInMs,
- totalMbRead / (fetchTimeInMs / 1000.0),
- totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
- );
- }
+ try (Consumer<byte[], byte[]> consumer =
consumerCreator.apply(options.props())) {
+ long bytesRead = 0L;
+ long messagesRead = 0L;
+ long lastBytesRead = 0L;
+ long lastMessagesRead = 0L;
+ long currentTimeMs = System.currentTimeMillis();
+ long joinStartMs = currentTimeMs;
+ long startMs = currentTimeMs;
+ consume(consumer, options, totalMessagesRead, totalBytesRead,
joinTimeMs,
+ bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
+ joinStartMs, joinTimeMsInSingleRound);
+ long endMs = System.currentTimeMillis();
+
+ // print final stats
+ double elapsedSec = (endMs - startMs) / 1_000.0;
+ long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get();
+ if (!options.showDetailedStats()) {
+ double totalMbRead = (totalBytesRead.get() * 1.0) / (1024
* 1024);
+ System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d,
%.4f, %.4f%n",
+ options.dateFormat().format(startMs),
+ options.dateFormat().format(endMs),
+ totalMbRead,
+ totalMbRead / elapsedSec,
+ totalMessagesRead.get(),
+ totalMessagesRead.get() / elapsedSec,
+ joinTimeMs.get(),
+ fetchTimeInMs,
+ totalMbRead / (fetchTimeInMs / 1000.0),
+ totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
+ );
+ }
- if (metrics != null)
- ToolsUtils.printMetrics(metrics);
+ if (options.printMetrics()) {
+ ToolsUtils.printMetrics(consumer.metrics());
+ }
+ }
} catch (Throwable e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
@@ -120,7 +120,7 @@ public class ConsumerPerformance {
System.out.printf("time, threadId, data.consumed.in.MB, MB.sec,
data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
}
- private static void consume(KafkaConsumer<byte[], byte[]> consumer,
+ private static void consume(Consumer<byte[], byte[]> consumer,
ConsumerPerfOptions options,
AtomicLong totalMessagesRead,
AtomicLong totalBytesRead,
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
index d78b65e54a3..df6c3a93966 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
@@ -16,8 +16,12 @@
*/
package org.apache.kafka.tools;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -30,6 +34,8 @@ import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
+import java.util.Properties;
+import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -167,6 +173,21 @@ public class ConsumerPerformanceTest {
assertEquals("perf-consumer-client",
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
+ @Test
+ public void testMetricsRetrievedBeforeConsumerClosed() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--messages", "0",
+ "--print-metrics"
+ };
+
+ Function<Properties, Consumer<byte[], byte[]>> consumerCreator =
properties -> new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name());
+
+ String err = ToolsTestUtils.captureStandardErr(() ->
ConsumerPerformance.run(args, consumerCreator));
+ assertTrue(Utils.isBlank(err), "Should be no stderr message, but was
\"" + err + "\"");
+ }
+
private void testHeaderMatchContent(boolean detailed, int
expectedOutputLineCount, Runnable runnable) {
String out = ToolsTestUtils.captureStandardOut(() -> {
ConsumerPerformance.printHeader(detailed);