This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 01ab888 KAFKA-13229: add total blocked time metric to streams
(KIP-761) (#11149)
01ab888 is described below
commit 01ab888dbd08ccd4b0de9333d21581ce24fe2c3b
Author: Rohan <[email protected]>
AuthorDate: Mon Aug 30 15:39:25 2021 -0700
KAFKA-13229: add total blocked time metric to streams (KIP-761) (#11149)
* Add the following producer metrics:
flush-time-total: cumulative sum of time elapsed during in flush.
txn-init-time-total: cumulative sum of time elapsed during in
initTransactions.
txn-begin-time-total: cumulative sum of time elapsed during in
beginTransaction.
txn-send-offsets-time-total: cumulative sum of time elapsed during in
sendOffsetsToTransaction.
txn-commit-time-total: cumulative sum of time elapsed during in
commitTransaction.
txn-abort-time-total: cumulative sum of time elapsed during in
abortTransaction.
* Add the following consumer metrics:
commited-time-total: cumulative sum of time elapsed during in committed.
commit-sync-time-total: cumulative sum of time elapsed during in commitSync.
* Add a total-blocked-time metric to streams that is the sum of:
consumer’s io-waittime-total
consumer’s iotime-total
consumer’s committed-time-total
consumer’s commit-sync-time-total
restore consumer’s io-waittime-total
restore consumer’s iotime-total
admin client’s io-waittime-total
admin client’s iotime-total
producer’s bufferpool-wait-time-total
producer's flush-time-total
producer's txn-init-time-total
producer's txn-begin-time-total
producer's txn-send-offsets-time-total
producer's txn-commit-time-total
producer's txn-abort-time-total
Reviewers: Bruno Cadonna <[email protected]>, Guozhang Wang
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/clients/consumer/KafkaConsumer.java | 7 +-
.../consumer/internals/KafkaConsumerMetrics.java | 33 ++++
.../kafka/clients/producer/KafkaProducer.java | 18 +++
.../producer/internals/KafkaProducerMetrics.java | 123 ++++++++++++++
.../producer/internals/SenderMetricsRegistry.java | 3 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 102 +++++++++++-
.../internals/KafkaConsumerMetricsTest.java | 76 +++++++++
.../kafka/clients/producer/KafkaProducerTest.java | 123 ++++++++++++++
.../internals/KafkaProducerMetricsTest.java | 117 ++++++++++++++
.../processor/internals/ActiveTaskCreator.java | 14 +-
.../processor/internals/GlobalStreamThread.java | 2 +
.../streams/processor/internals/StreamThread.java | 16 ++
.../internals/StreamThreadTotalBlockedTime.java | 59 +++++++
.../processor/internals/StreamsProducer.java | 46 +++++-
.../streams/processor/internals/TaskManager.java | 4 +
.../kafka/streams/processor/internals/Tasks.java | 4 +
.../internals/metrics/StreamsMetricsImpl.java | 40 +++++
.../processor/internals/metrics/ThreadMetrics.java | 29 ++++
.../integration/MetricsIntegrationTest.java | 4 +
.../processor/internals/ActiveTaskCreatorTest.java | 52 +++++-
.../processor/internals/RecordCollectorTest.java | 19 ++-
.../StreamThreadTotalBlockedTimeTest.java | 113 +++++++++++++
.../processor/internals/StreamsProducerTest.java | 177 ++++++++++++++++++---
.../internals/metrics/StreamsMetricsImplTest.java | 90 +++++++++++
.../internals/metrics/ThreadMetricsTest.java | 57 +++++++
.../streams/state/KeyValueStoreTestDriver.java | 4 +-
.../StreamThreadStateStoreProviderTest.java | 4 +-
.../apache/kafka/streams/TopologyTestDriver.java | 8 +-
29 files changed, 1302 insertions(+), 44 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5dd9187..bd53af1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -14,7 +14,7 @@
<suppress checks="NPathComplexity"
files="(MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
<suppress checks="JavaNCSS"
- files="(ApiMessageType|FieldSpec|MessageDataGenerator).java"/>
+
files="(ApiMessageType|FieldSpec|MessageDataGenerator|KafkaConsumerTest).java"/>
<suppress checks="MethodLength"
files="(FieldSpec|MessageDataGenerator).java"/>
<suppress id="dontUseSystemExit"
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 29a9e37..286f84b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1485,6 +1485,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata>
offsets, final Duration timeout) {
acquireAndEnsureOpen();
+ long commitStart = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
offsets.forEach(this::updateLastSeenEpochIfNewer);
@@ -1493,6 +1494,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
"committing offsets " + offsets);
}
} finally {
+ kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() -
commitStart);
release();
}
}
@@ -1871,9 +1873,11 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final
Set<TopicPartition> partitions, final Duration timeout) {
acquireAndEnsureOpen();
+ long start = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
- Map<TopicPartition, OffsetAndMetadata> offsets =
coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
+ final Map<TopicPartition, OffsetAndMetadata> offsets;
+ offsets = coordinator.fetchCommittedOffsets(partitions,
time.timer(timeout));
if (offsets == null) {
throw new TimeoutException("Timeout of " + timeout.toMillis()
+ "ms expired before the last " +
"committed offset for partitions " + partitions + " could
be determined. Try tuning default.api.timeout.ms " +
@@ -1883,6 +1887,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
return offsets;
}
} finally {
+ kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start);
release();
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
index 71332b8..0dc8a33 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import java.util.concurrent.TimeUnit;
@@ -29,6 +30,8 @@ public class KafkaConsumerMetrics implements AutoCloseable {
private final MetricName lastPollMetricName;
private final Sensor timeBetweenPollSensor;
private final Sensor pollIdleSensor;
+ private final Sensor committedSensor;
+ private final Sensor commitSyncSensor;
private final Metrics metrics;
private long lastPollMs;
private long pollStartMs;
@@ -63,6 +66,26 @@ public class KafkaConsumerMetrics implements AutoCloseable {
metricGroupName,
"The average fraction of time the consumer's poll() is idle as
opposed to waiting for the user code to process records."),
new Avg());
+
+ this.commitSyncSensor = metrics.sensor("commit-sync-time-ns-total");
+ this.commitSyncSensor.add(
+ metrics.metricName(
+ "commit-sync-time-ns-total",
+ metricGroupName,
+ "The total time the consumer has spent in commitSync in
nanoseconds"
+ ),
+ new CumulativeSum()
+ );
+
+ this.committedSensor = metrics.sensor("committed-time-ns-total");
+ this.committedSensor.add(
+ metrics.metricName(
+ "committed-time-ns-total",
+ metricGroupName,
+ "The total time the consumer has spent in committed in
nanoseconds"
+ ),
+ new CumulativeSum()
+ );
}
public void recordPollStart(long pollStartMs) {
@@ -78,10 +101,20 @@ public class KafkaConsumerMetrics implements AutoCloseable
{
this.pollIdleSensor.record(pollIdleRatio);
}
+ public void recordCommitSync(long duration) {
+ this.commitSyncSensor.record(duration);
+ }
+
+ public void recordCommitted(long duration) {
+ this.committedSensor.record(duration);
+ }
+
@Override
public void close() {
metrics.removeMetric(lastPollMetricName);
metrics.removeSensor(timeBetweenPollSensor.name());
metrics.removeSensor(pollIdleSensor.name());
+ metrics.removeSensor(commitSyncSensor.name());
+ metrics.removeSensor(committedSensor.name());
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ef8a9cc..756e300 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.internals.BufferPool;
+import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
@@ -241,6 +242,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final String clientId;
// Visible for testing
final Metrics metrics;
+ private final KafkaProducerMetrics producerMetrics;
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
@@ -356,6 +358,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time,
metricsContext);
+ this.producerMetrics = new KafkaProducerMetrics(metrics);
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
@@ -590,9 +593,11 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
public void initTransactions() {
throwIfNoTransactionManager();
throwIfProducerClosed();
+ long now = time.nanoseconds();
TransactionalRequestResult result =
transactionManager.initializeTransactions();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ producerMetrics.recordInit(time.nanoseconds() - now);
}
/**
@@ -613,7 +618,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
public void beginTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
+ long now = time.nanoseconds();
transactionManager.beginTransaction();
+ producerMetrics.recordBeginTxn(time.nanoseconds() - now);
}
/**
@@ -697,9 +704,11 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
throwIfInvalidGroupMetadata(groupMetadata);
throwIfNoTransactionManager();
throwIfProducerClosed();
+ long start = time.nanoseconds();
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
/**
@@ -730,9 +739,11 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
public void commitTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();
+ long commitStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ producerMetrics.recordCommitTxn(time.nanoseconds() - commitStart);
}
/**
@@ -761,9 +772,11 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
throwIfNoTransactionManager();
throwIfProducerClosed();
log.info("Aborting incomplete transaction");
+ long abortStart = time.nanoseconds();
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
+ producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
}
/**
@@ -1124,12 +1137,16 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
@Override
public void flush() {
log.trace("Flushing accumulated records in producer.");
+
+ long start = time.nanoseconds();
this.accumulator.beginFlush();
this.sender.wakeup();
try {
this.accumulator.awaitFlushCompletion();
} catch (InterruptedException e) {
throw new InterruptException("Flush interrupted.", e);
+ } finally {
+ producerMetrics.recordFlush(time.nanoseconds() - start);
}
}
@@ -1245,6 +1262,7 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
}
Utils.closeQuietly(interceptors, "producer interceptors",
firstException);
+ Utils.closeQuietly(producerMetrics, "producer metrics wrapper",
firstException);
Utils.closeQuietly(metrics, "producer metrics", firstException);
Utils.closeQuietly(keySerializer, "producer keySerializer",
firstException);
Utils.closeQuietly(valueSerializer, "producer valueSerializer",
firstException);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
new file mode 100644
index 0000000..b8ea762
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+
+import java.util.Map;
+
+public class KafkaProducerMetrics implements AutoCloseable {
+
+ public static final String GROUP = "producer-metrics";
+ private static final String FLUSH = "flush";
+ private static final String TXN_INIT = "txn-init";
+ private static final String TXN_BEGIN = "txn-begin";
+ private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+ private static final String TXN_COMMIT = "txn-commit";
+ private static final String TXN_ABORT = "txn-abort";
+ private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";
+
+ private final Map<String, String> tags;
+ private final Metrics metrics;
+ private final Sensor initTimeSensor;
+ private final Sensor beginTxnTimeSensor;
+ private final Sensor flushTimeSensor;
+ private final Sensor sendOffsetsSensor;
+ private final Sensor commitTxnSensor;
+ private final Sensor abortTxnSensor;
+
+ public KafkaProducerMetrics(Metrics metrics) {
+ this.metrics = metrics;
+ tags = this.metrics.config().tags();
+ flushTimeSensor = newLatencySensor(
+ FLUSH,
+ "Total time producer has spent in flush in nanoseconds."
+ );
+ initTimeSensor = newLatencySensor(
+ TXN_INIT,
+ "Total time producer has spent in initTransactions in nanoseconds."
+ );
+ beginTxnTimeSensor = newLatencySensor(
+ TXN_BEGIN,
+ "Total time producer has spent in beginTransaction in nanoseconds."
+ );
+ sendOffsetsSensor = newLatencySensor(
+ TXN_SEND_OFFSETS,
+ "Total time producer has spent in sendOffsetsToTransaction."
+ );
+ commitTxnSensor = newLatencySensor(
+ TXN_COMMIT,
+ "Total time producer has spent in commitTransaction."
+ );
+ abortTxnSensor = newLatencySensor(
+ TXN_ABORT,
+ "Total time producer has spent in abortTransaction."
+ );
+ }
+
+ @Override
+ public void close() {
+ removeMetric(FLUSH);
+ removeMetric(TXN_INIT);
+ removeMetric(TXN_BEGIN);
+ removeMetric(TXN_SEND_OFFSETS);
+ removeMetric(TXN_COMMIT);
+ removeMetric(TXN_ABORT);
+ }
+
+ public void recordFlush(long duration) {
+ flushTimeSensor.record(duration);
+ }
+
+ public void recordInit(long duration) {
+ initTimeSensor.record(duration);
+ }
+
+ public void recordBeginTxn(long duration) {
+ beginTxnTimeSensor.record(duration);
+ }
+
+ public void recordSendOffsets(long duration) {
+ sendOffsetsSensor.record(duration);
+ }
+
+ public void recordCommitTxn(long duration) {
+ commitTxnSensor.record(duration);
+ }
+
+ public void recordAbortTxn(long duration) {
+ abortTxnSensor.record(duration);
+ }
+
+ private Sensor newLatencySensor(String name, String description) {
+ Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX);
+ sensor.add(metricName(name, description), new CumulativeSum());
+ return sensor;
+ }
+
+ private MetricName metricName(final String name, final String description)
{
+ return metrics.metricName(name + TOTAL_TIME_SUFFIX, GROUP,
description, tags);
+ }
+
+ private void removeMetric(final String name) {
+ metrics.removeSensor(name + TOTAL_TIME_SUFFIX);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
index 6438973..2ad2cba 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.Sensor;
public class SenderMetricsRegistry {
- final static String METRIC_GROUP_NAME = "producer-metrics";
final static String TOPIC_METRIC_GROUP_NAME = "producer-topic-metrics";
private final List<MetricNameTemplate> allTemplates;
@@ -154,7 +153,7 @@ public class SenderMetricsRegistry {
}
private MetricName createMetricName(String name, String description) {
- return this.metrics.metricInstance(createTemplate(name,
METRIC_GROUP_NAME, description, this.tags));
+ return this.metrics.metricInstance(createTemplate(name,
KafkaProducerMetrics.GROUP, description, this.tags));
}
private MetricNameTemplate createTopicTemplate(String name, String
description) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 57cd942..bc7d506 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -34,6 +34,7 @@ import
org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -1926,6 +1927,96 @@ public class KafkaConsumerTest {
}
@Test
+ public void testMeasureCommitSyncDurationOnFailure() {
+ final KafkaConsumer<String, String> consumer
+ = consumerWithPendingError(new
MockTime(Duration.ofSeconds(1).toMillis()));
+
+ try {
+ consumer.commitSync(Collections.singletonMap(tp0, new
OffsetAndMetadata(10L)));
+ } catch (final RuntimeException e) {
+ }
+
+ final Metric metric = consumer.metrics()
+ .get(consumer.metrics.metricName("commit-sync-time-ns-total",
"consumer-metrics"));
+ assertTrue((Double) metric.metricValue() >=
Duration.ofMillis(999).toNanos());
+ }
+
+ @Test
+ public void testMeasureCommitSyncDuration() {
+ Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+ SubscriptionState subscription = new SubscriptionState(new
LogContext(),
+ OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+ initMetadata(client, Collections.singletonMap(topic, 2));
+ Node node = metadata.fetch().nodes().get(0);
+ ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+ KafkaConsumer<String, String> consumer = newConsumer(time, client,
subscription, metadata,
+ assignor, true, groupInstanceId);
+ consumer.assign(singletonList(tp0));
+
+ client.prepareResponseFrom(
+ FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId,
node), node);
+ Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
+ client.prepareResponseFrom(
+ offsetCommitResponse(Collections.singletonMap(tp0, Errors.NONE)),
+ coordinator
+ );
+
+ consumer.commitSync(Collections.singletonMap(tp0, new
OffsetAndMetadata(10L)));
+
+ final Metric metric = consumer.metrics()
+ .get(consumer.metrics.metricName("commit-sync-time-ns-total",
"consumer-metrics"));
+ assertTrue((Double) metric.metricValue() >=
Duration.ofMillis(999).toNanos());
+ }
+
+ @Test
+ public void testMeasureCommittedDurationOnFailure() {
+ final KafkaConsumer<String, String> consumer
+ = consumerWithPendingError(new
MockTime(Duration.ofSeconds(1).toMillis()));
+
+ try {
+ consumer.committed(Collections.singleton(tp0));
+ } catch (final RuntimeException e) {
+ }
+
+ final Metric metric = consumer.metrics()
+ .get(consumer.metrics.metricName("committed-time-ns-total",
"consumer-metrics"));
+ assertTrue((Double) metric.metricValue() >=
Duration.ofMillis(999).toNanos());
+ }
+
+ @Test
+ public void testMeasureCommittedDuration() {
+ long offset1 = 10000;
+ Time time = new MockTime(Duration.ofSeconds(1).toMillis());
+ SubscriptionState subscription = new SubscriptionState(new
LogContext(),
+ OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+ initMetadata(client, Collections.singletonMap(topic, 2));
+ Node node = metadata.fetch().nodes().get(0);
+ ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+ KafkaConsumer<String, String> consumer = newConsumer(time, client,
subscription, metadata,
+ assignor, true, groupInstanceId);
+ consumer.assign(singletonList(tp0));
+
+ // lookup coordinator
+ client.prepareResponseFrom(
+ FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId,
node), node);
+ Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
+
+ // fetch offset for one topic
+ client.prepareResponseFrom(
+ offsetResponse(Collections.singletonMap(tp0, offset1),
Errors.NONE), coordinator);
+
+ consumer.committed(Collections.singleton(tp0)).get(tp0).offset();
+
+ final Metric metric = consumer.metrics()
+ .get(consumer.metrics.metricName("committed-time-ns-total",
"consumer-metrics"));
+ assertTrue((Double) metric.metricValue() >=
Duration.ofMillis(999).toNanos());
+ }
+
+ @Test
public void testRebalanceException() {
Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
@@ -2247,8 +2338,7 @@ public class KafkaConsumerTest {
consumer.close(Duration.ZERO);
}
- private KafkaConsumer<String, String>
consumerWithPendingAuthenticationError() {
- Time time = new MockTime();
+ private KafkaConsumer<String, String>
consumerWithPendingAuthenticationError(final Time time) {
SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -2262,6 +2352,14 @@ public class KafkaConsumerTest {
return newConsumer(time, client, subscription, metadata, assignor,
false, groupInstanceId);
}
+ private KafkaConsumer<String, String>
consumerWithPendingAuthenticationError() {
+ return consumerWithPendingAuthenticationError(new MockTime());
+ }
+
+ private KafkaConsumer<String, String> consumerWithPendingError(final Time
time) {
+ return consumerWithPendingAuthenticationError(time);
+ }
+
private ConsumerRebalanceListener getConsumerRebalanceListener(final
KafkaConsumer<String, String> consumer) {
return new ConsumerRebalanceListener() {
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
new file mode 100644
index 0000000..087f90b
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class KafkaConsumerMetricsTest {
+ private static final long METRIC_VALUE = 123L;
+ private static final String CONSUMER_GROUP_PREFIX = "consumer";
+ private static final String CONSUMER_METRIC_GROUP = "consumer-metrics";
+ private static final String COMMIT_SYNC_TIME_TOTAL =
"commit-sync-time-ns-total";
+ private static final String COMMITTED_TIME_TOTAL =
"committed-time-ns-total";
+
+ private final Metrics metrics = new Metrics();
+ private final KafkaConsumerMetrics consumerMetrics
+ = new KafkaConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX);
+
+ @Test
+ public void shouldRecordCommitSyncTime() {
+ // When:
+ consumerMetrics.recordCommitSync(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(COMMIT_SYNC_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordCommittedTime() {
+ // When:
+ consumerMetrics.recordCommitted(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(COMMITTED_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRemoveMetricsOnClose() {
+ // When:
+ consumerMetrics.close();
+
+ // Then:
+ assertMetricRemoved(COMMIT_SYNC_TIME_TOTAL);
+ assertMetricRemoved(COMMITTED_TIME_TOTAL);
+ }
+
+ private void assertMetricRemoved(final String name) {
+ assertNull(metrics.metric(metrics.metricName(name,
CONSUMER_METRIC_GROUP)));
+ }
+
+ private void assertMetricValue(final String name) {
+ assertEquals(
+ metrics.metric(metrics.metricName(name,
CONSUMER_METRIC_GROUP)).metricValue(),
+ (double) METRIC_VALUE
+ );
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 2784f19..c48c3fe 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -42,6 +43,7 @@ import
org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.network.Selectable;
@@ -813,6 +815,41 @@ public class KafkaProducerTest {
}
}
+ private static Double getMetricValue(final KafkaProducer<?, ?> producer,
final String name) {
+ Metrics metrics = producer.metrics;
+ Metric metric = metrics.metric(metrics.metricName(name,
"producer-metrics"));
+ return (Double) metric.metricValue();
+ }
+
+ @Test
+ public void testFlushMeasureLatency() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse =
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+
+ try (KafkaProducer<String, String> producer = kafkaProducer(
+ configs,
+ new StringSerializer(),
+ new StringSerializer(),
+ metadata,
+ client,
+ null,
+ time
+ )) {
+ producer.flush();
+ double first = getMetricValue(producer, "flush-time-ns-total");
+ assertTrue(first > 0);
+ producer.flush();
+ assertTrue(getMetricValue(producer, "flush-time-ns-total") >
first);
+ }
+ }
+
@Test
public void testMetricConfigRecordingLevel() {
Properties props = new Properties();
@@ -952,6 +989,36 @@ public class KafkaProducerTest {
}
@Test
+ public void testMeasureAbortTransactionDuration() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse =
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"some.id", host1));
+ client.prepareResponse(initProducerIdResponse(1L, (short) 5,
Errors.NONE));
+
+ try (KafkaProducer<String, String> producer = kafkaProducer(configs,
new StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
+ producer.initTransactions();
+
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+ producer.beginTransaction();
+ producer.abortTransaction();
+ double first = getMetricValue(producer, "txn-abort-time-ns-total");
+ assertTrue(first > 0);
+
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+ producer.beginTransaction();
+ producer.abortTransaction();
+ assertTrue(getMetricValue(producer, "txn-abort-time-ns-total") >
first);
+ }
+ }
+
+ @Test
public void testSendTxnOffsetsWithGroupId() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
@@ -988,6 +1055,62 @@ public class KafkaProducerTest {
}
}
+ private void assertDurationAtLeast(KafkaProducer<?, ?> producer, String
name, double floor) {
+ getAndAssertDurationAtLeast(producer, name, floor);
+ }
+
+ private double getAndAssertDurationAtLeast(KafkaProducer<?, ?> producer,
String name, double floor) {
+ double value = getMetricValue(producer, name);
+ assertTrue(value > floor);
+ return value;
+ }
+
+ @Test
+ public void testMeasureTransactionDurations() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ Duration tick = Duration.ofSeconds(1);
+ Time time = new MockTime(tick.toMillis());
+ MetadataResponse initialUpdateResponse =
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"some.id", host1));
+ client.prepareResponse(initProducerIdResponse(1L, (short) 5,
Errors.NONE));
+
+ try (KafkaProducer<String, String> producer = kafkaProducer(configs,
new StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
+ producer.initTransactions();
+ assertDurationAtLeast(producer, "txn-init-time-ns-total",
tick.toNanos());
+
+ client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"some.id", host1));
+
client.prepareResponse(txnOffsetsCommitResponse(Collections.singletonMap(
+ new TopicPartition("topic", 0), Errors.NONE)));
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+ producer.beginTransaction();
+ double beginFirst = getAndAssertDurationAtLeast(producer,
"txn-begin-time-ns-total", tick.toNanos());
+ producer.sendOffsetsToTransaction(Collections.emptyMap(), new
ConsumerGroupMetadata("group"));
+ double sendOffFirst = getAndAssertDurationAtLeast(producer,
"txn-send-offsets-time-ns-total", tick.toNanos());
+ producer.commitTransaction();
+ double commitFirst = getAndAssertDurationAtLeast(producer,
"txn-commit-time-ns-total", tick.toNanos());
+
+ client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
+
client.prepareResponse(txnOffsetsCommitResponse(Collections.singletonMap(
+ new TopicPartition("topic", 0), Errors.NONE)));
+ client.prepareResponse(endTxnResponse(Errors.NONE));
+ producer.beginTransaction();
+ assertDurationAtLeast(producer, "txn-begin-time-ns-total",
beginFirst + tick.toNanos());
+ producer.sendOffsetsToTransaction(Collections.emptyMap(), new
ConsumerGroupMetadata("group"));
+ assertDurationAtLeast(producer, "txn-send-offsets-time-ns-total",
sendOffFirst + tick.toNanos());
+ producer.commitTransaction();
+ assertDurationAtLeast(producer, "txn-commit-time-ns-total",
commitFirst + tick.toNanos());
+ }
+ }
+
@Test
public void testSendTxnOffsetsWithGroupMetadata() {
final short maxVersion = (short) 3;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
new file mode 100644
index 0000000..e068861
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetricsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class KafkaProducerMetricsTest {
+ private static final long METRIC_VALUE = 123L;
+ private static final String FLUSH_TIME_TOTAL = "flush-time-ns-total";
+ private static final String TXN_INIT_TIME_TOTAL = "txn-init-time-ns-total";
+ private static final String TXN_BEGIN_TIME_TOTAL =
"txn-begin-time-ns-total";
+ private static final String TXN_COMMIT_TIME_TOTAL =
"txn-commit-time-ns-total";
+ private static final String TXN_ABORT_TIME_TOTAL =
"txn-abort-time-ns-total";
+ private static final String TXN_SEND_OFFSETS_TIME_TOTAL =
"txn-send-offsets-time-ns-total";
+
+ private final Metrics metrics = new Metrics();
+ private final KafkaProducerMetrics producerMetrics = new
KafkaProducerMetrics(metrics);
+
+ @Test
+ public void shouldRecordFlushTime() {
+ // When:
+ producerMetrics.recordFlush(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(FLUSH_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordInitTime() {
+ // When:
+ producerMetrics.recordInit(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_INIT_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordTxBeginTime() {
+ // When:
+ producerMetrics.recordBeginTxn(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_BEGIN_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordTxCommitTime() {
+ // When:
+ producerMetrics.recordCommitTxn(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_COMMIT_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordTxAbortTime() {
+ // When:
+ producerMetrics.recordAbortTxn(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_ABORT_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRecordSendOffsetsTime() {
+ // When:
+ producerMetrics.recordSendOffsets(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue(TXN_SEND_OFFSETS_TIME_TOTAL);
+ }
+
+ @Test
+ public void shouldRemoveMetricsOnClose() {
+ // When:
+ producerMetrics.close();
+
+ // Then:
+ assertMetricRemoved(FLUSH_TIME_TOTAL);
+ assertMetricRemoved(TXN_INIT_TIME_TOTAL);
+ assertMetricRemoved(TXN_BEGIN_TIME_TOTAL);
+ assertMetricRemoved(TXN_COMMIT_TIME_TOTAL);
+ assertMetricRemoved(TXN_ABORT_TIME_TOTAL);
+ assertMetricRemoved(TXN_SEND_OFFSETS_TIME_TOTAL);
+ }
+
+ private void assertMetricRemoved(final String name) {
+ assertNull(metrics.metric(metrics.metricName(name,
KafkaProducerMetrics.GROUP)));
+ }
+
+ private void assertMetricValue(final String name) {
+ assertEquals(
+ metrics.metric(metrics.metricName(name,
KafkaProducerMetrics.GROUP)).metricValue(),
+ (double) METRIC_VALUE
+ );
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 06171d3..2ed3bd8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -110,7 +110,8 @@ class ActiveTaskCreator {
clientSupplier,
null,
processId,
- logContext);
+ logContext,
+ time);
taskProducers = Collections.emptyMap();
}
}
@@ -243,7 +244,8 @@ class ActiveTaskCreator {
clientSupplier,
taskId,
null,
- logContext);
+ logContext,
+ time);
taskProducers.put(taskId, streamsProducer);
} else {
streamsProducer = threadProducer;
@@ -326,4 +328,12 @@ class ActiveTaskCreator {
return new LogContext(logPrefix);
}
+ public double totalProducerBlockedTime() {
+ if (threadProducer != null) {
+ return threadProducer.totalBlockedTime();
+ }
+ return taskProducers.values().stream()
+ .mapToDouble(StreamsProducer::totalBlockedTime)
+ .sum();
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index e55c5d7..f45350d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -296,6 +296,7 @@ public class GlobalStreamThread extends Thread {
log.warn("Error happened during initialization of the global state
store; this thread has shutdown");
streamsMetrics.removeAllThreadLevelSensors(getName());
+ streamsMetrics.removeAllThreadLevelMetrics(getName());
return;
}
@@ -338,6 +339,7 @@ public class GlobalStreamThread extends Thread {
}
streamsMetrics.removeAllThreadLevelSensors(getName());
+ streamsMetrics.removeAllThreadLevelMetrics(getName());
setState(DEAD);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eb3fe79..a1da373 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -512,6 +512,21 @@ public class StreamThread extends Thread {
ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
+ ThreadMetrics.addThreadStartTimeMetric(
+ threadId,
+ streamsMetrics,
+ time.milliseconds()
+ );
+ ThreadMetrics.addThreadBlockedTimeMetric(
+ threadId,
+ new StreamThreadTotalBlockedTime(
+ mainConsumer,
+ restoreConsumer,
+ taskManager::totalProducerBlockedTime
+ ),
+ streamsMetrics
+ );
+
this.time = time;
this.topologyMetadata = topologyMetadata;
this.logPrefix = logContext.logPrefix();
@@ -1127,6 +1142,7 @@ public class StreamThread extends Thread {
log.error("Failed to close restore consumer due to the following
error:", e);
}
streamsMetrics.removeAllThreadLevelSensors(getName());
+ streamsMetrics.removeAllThreadLevelMetrics(getName());
setState(State.DEAD);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java
new file mode 100644
index 0000000..cf37633
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTime.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamThreadTotalBlockedTime {
+ private final Consumer<?, ?> consumer;
+ private final Consumer<?, ?> restoreConsumer;
+ private final Supplier<Double> producerTotalBlockedTime;
+
+ StreamThreadTotalBlockedTime(
+ final Consumer<?, ?> consumer,
+ final Consumer<?, ?> restoreConsumer,
+ final Supplier<Double> producerTotalBlockedTime) {
+ this.consumer = consumer;
+ this.restoreConsumer = restoreConsumer;
+ this.producerTotalBlockedTime = producerTotalBlockedTime;
+ }
+
+ private double metricValue(
+ final Map<MetricName, ? extends Metric> metrics,
+ final String name) {
+ return metrics.keySet().stream()
+ .filter(n -> n.name().equals(name))
+ .findFirst()
+ .map(n -> (Double) metrics.get(n).metricValue())
+ .orElse(0.0);
+ }
+
+ public double compute() {
+ return metricValue(consumer.metrics(), "io-waittime-total")
+ + metricValue(consumer.metrics(), "iotime-total")
+ + metricValue(consumer.metrics(), "committed-time-ns-total")
+ + metricValue(consumer.metrics(), "commit-sync-time-ns-total")
+ + metricValue(restoreConsumer.metrics(), "io-waittime-total")
+ + metricValue(restoreConsumer.metrics(), "iotime-total")
+ + producerTotalBlockedTime.get();
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 8655f01..23ee0b1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -35,6 +36,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
@@ -67,22 +69,26 @@ public class StreamsProducer {
private final Map<String, Object> eosV2ProducerConfigs;
private final KafkaClientSupplier clientSupplier;
private final StreamThread.ProcessingMode processingMode;
+ private final Time time;
private Producer<byte[], byte[]> producer;
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
+ private double oldProducerTotalBlockedTime = 0;
public StreamsProducer(final StreamsConfig config,
final String threadId,
final KafkaClientSupplier clientSupplier,
final TaskId taskId,
final UUID processId,
- final LogContext logContext) {
+ final LogContext logContext,
+ final Time time) {
Objects.requireNonNull(config, "config cannot be null");
Objects.requireNonNull(threadId, "threadId cannot be null");
this.clientSupplier = Objects.requireNonNull(clientSupplier,
"clientSupplier cannot be null");
log = Objects.requireNonNull(logContext, "logContext cannot be
null").logger(getClass());
logPrefix = logContext.logPrefix().trim();
+ this.time = Objects.requireNonNull(time, "time");
processingMode = StreamThread.processingMode(config);
@@ -178,12 +184,50 @@ public class StreamsProducer {
throw new IllegalStateException("Expected eos-v2 to be enabled,
but the processing mode was " + processingMode);
}
+ oldProducerTotalBlockedTime += totalBlockedTime(producer);
+ final long start = time.nanoseconds();
producer.close();
+ final long closeTime = time.nanoseconds() - start;
+ oldProducerTotalBlockedTime += closeTime;
producer = clientSupplier.getProducer(eosV2ProducerConfigs);
transactionInitialized = false;
}
+ private double getMetricValue(final Map<MetricName, ? extends Metric>
metrics,
+ final String name) {
+ final List<MetricName> found = metrics.keySet().stream()
+ .filter(n -> n.name().equals(name))
+ .collect(Collectors.toList());
+ if (found.isEmpty()) {
+ return 0.0;
+ }
+ if (found.size() > 1) {
+ final String err = String.format(
+ "found %d values for metric %s. total blocked time computation
may be incorrect",
+ found.size(),
+ name
+ );
+ log.error(err);
+ throw new IllegalStateException(err);
+ }
+ return (Double) metrics.get(found.get(0)).metricValue();
+ }
+
+ private double totalBlockedTime(final Producer<?, ?> producer) {
+ return getMetricValue(producer.metrics(), "bufferpool-wait-time-total")
+ + getMetricValue(producer.metrics(), "flush-time-ns-total")
+ + getMetricValue(producer.metrics(), "txn-init-time-ns-total")
+ + getMetricValue(producer.metrics(), "txn-begin-time-ns-total")
+ + getMetricValue(producer.metrics(),
"txn-send-offsets-time-ns-total")
+ + getMetricValue(producer.metrics(), "txn-commit-time-ns-total")
+ + getMetricValue(producer.metrics(), "txn-abort-time-ns-total");
+ }
+
+ public double totalBlockedTime() {
+ return oldProducerTotalBlockedTime + totalBlockedTime(producer);
+ }
+
private void maybeBeginTransaction() {
if (eosEnabled() && !transactionInFlight) {
try {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 24b90a6..9269c9d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -124,6 +124,10 @@ public class TaskManager {
tasks.setMainConsumer(mainConsumer);
}
+ public double totalProducerBlockedTime() {
+ return tasks.totalProducerBlockedTime();
+ }
+
public UUID processId() {
return processId;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 35056ff..96c0ee1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -101,6 +101,10 @@ class Tasks {
);
}
+ double totalProducerBlockedTime() {
+ return activeTaskCreator.totalProducerBlockedTime();
+ }
+
void createTasks(final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate,
final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate) {
for (final Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated :
activeTasksToCreate.entrySet()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 9a36898..dea2399 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -91,6 +91,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private final Version version;
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
private final Deque<String> clientLevelSensors = new LinkedList<>();
+ private final Map<String, Deque<MetricName>> threadLevelMetrics = new
HashMap<>();
private final Map<String, Deque<String>> threadLevelSensors = new
HashMap<>();
private final Map<String, Deque<String>> taskLevelSensors = new
HashMap<>();
private final Map<String, Deque<String>> nodeLevelSensors = new
HashMap<>();
@@ -200,6 +201,36 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
}
+ public <T> void addThreadLevelImmutableMetric(final String name,
+ final String description,
+ final String threadId,
+ final T value) {
+ final MetricName metricName = metrics.metricName(
+ name, THREAD_LEVEL_GROUP, description,
threadLevelTagMap(threadId));
+ synchronized (threadLevelMetrics) {
+ threadLevelMetrics.computeIfAbsent(
+ threadSensorPrefix(threadId),
+ tid -> new LinkedList<>()
+ ).add(metricName);
+ metrics.addMetric(metricName, new ImmutableMetricValue<>(value));
+ }
+ }
+
+ public <T> void addThreadLevelMutableMetric(final String name,
+ final String description,
+ final String threadId,
+ final Gauge<T> valueProvider) {
+ final MetricName metricName = metrics.metricName(
+ name, THREAD_LEVEL_GROUP, description,
threadLevelTagMap(threadId));
+ synchronized (threadLevelMetrics) {
+ threadLevelMetrics.computeIfAbsent(
+ threadSensorPrefix(threadId),
+ tid -> new LinkedList<>()
+ ).add(metricName);
+ metrics.addMetric(metricName, valueProvider);
+ }
+ }
+
public final Sensor clientLevelSensor(final String sensorName,
final RecordingLevel recordingLevel,
final Sensor... parents) {
@@ -271,6 +302,15 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
}
+ public final void removeAllThreadLevelMetrics(final String threadId) {
+ synchronized (threadLevelMetrics) {
+ final Deque<MetricName> names =
threadLevelMetrics.remove(threadSensorPrefix(threadId));
+ while (names != null && !names.isEmpty()) {
+ metrics.removeMetric(names.pop());
+ }
+ }
+ }
+
public Map<String, String> taskLevelTagMap(final String threadId, final
String taskId) {
final Map<String, String> tagMap = threadLevelTagMap(threadId);
tagMap.put(TASK_ID_TAG, taskId);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index 28cb10f..8912f6e5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import java.util.Map;
@@ -45,6 +46,8 @@ public class ThreadMetrics {
private static final String CREATE_TASK = "task-created";
private static final String CLOSE_TASK = "task-closed";
private static final String SKIP_RECORD = "skipped-records";
+ private static final String BLOCKED_TIME = "blocked-time-ns-total";
+ private static final String THREAD_START_TIME = "thread-start-time";
private static final String COMMIT_DESCRIPTION = "calls to commit";
private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION +
COMMIT_DESCRIPTION;
@@ -91,6 +94,10 @@ public class ThreadMetrics {
"The fraction of time the thread spent on polling records from
consumer";
private static final String COMMIT_RATIO_DESCRIPTION =
"The fraction of time the thread spent on committing all tasks";
+ private static final String BLOCKED_TIME_DESCRIPTION =
+ "The total time the thread spent blocked on kafka";
+ private static final String THREAD_START_TIME_DESCRIPTION =
+ "The time that the thread was started";
public static Sensor createTaskSensor(final String threadId,
final StreamsMetricsImpl
streamsMetrics) {
@@ -310,6 +317,28 @@ public class ThreadMetrics {
return sensor;
}
+ public static void addThreadStartTimeMetric(final String threadId,
+ final StreamsMetricsImpl
streamsMetrics,
+ final long startTime) {
+ streamsMetrics.addThreadLevelImmutableMetric(
+ THREAD_START_TIME,
+ THREAD_START_TIME_DESCRIPTION,
+ threadId,
+ startTime
+ );
+ }
+
+ public static void addThreadBlockedTimeMetric(final String threadId,
+ final
StreamThreadTotalBlockedTime blockedTime,
+ final StreamsMetricsImpl
streamsMetrics) {
+ streamsMetrics.addThreadLevelMutableMetric(
+ BLOCKED_TIME,
+ BLOCKED_TIME_DESCRIPTION,
+ threadId,
+ (config, now) -> blockedTime.compute()
+ );
+ }
+
private static Sensor invocationRateAndCountSensor(final String threadId,
final String metricName,
final String
descriptionOfRate,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 84f5cfc..9ada60f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -191,6 +191,8 @@ public class MetricsIntegrationTest {
private static final String TASK_CREATED_TOTAL = "task-created-total";
private static final String TASK_CLOSED_RATE = "task-closed-rate";
private static final String TASK_CLOSED_TOTAL = "task-closed-total";
+ private static final String BLOCKED_TIME_TOTAL = "blocked-time-ns-total";
+ private static final String THREAD_START_TIME = "thread-start-time";
private static final String ACTIVE_PROCESS_RATIO = "active-process-ratio";
private static final String ACTIVE_BUFFER_COUNT = "active-buffer-count";
private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
@@ -503,6 +505,8 @@ public class MetricsIntegrationTest {
checkMetricByName(listMetricThread, TASK_CREATED_TOTAL, NUM_THREADS);
checkMetricByName(listMetricThread, TASK_CLOSED_RATE, NUM_THREADS);
checkMetricByName(listMetricThread, TASK_CLOSED_TOTAL, NUM_THREADS);
+ checkMetricByName(listMetricThread, BLOCKED_TIME_TOTAL, NUM_THREADS);
+ checkMetricByName(listMetricThread, THREAD_START_TIME, NUM_THREADS);
}
private void checkTaskLevelMetrics() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index b689dc1..74d81bd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
@@ -55,6 +56,7 @@ import static org.easymock.EasyMock.reset;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThrows;
@@ -117,6 +119,16 @@ public class ActiveTaskCreatorTest {
assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
}
+ @Test
+ public void shouldReturnBlockedTimeWhenThreadProducer() {
+ final double blockedTime = 123.0;
+ createTasks();
+ final MockProducer<?, ?> producer =
mockClientSupplier.producers.get(0);
+ addMetric(producer, "flush-time-ns-total", blockedTime);
+
+ assertThat(activeTaskCreator.totalProducerBlockedTime(),
closeTo(blockedTime, 0.01));
+ }
+
// error handling
@Test
@@ -224,6 +236,23 @@ public class ActiveTaskCreatorTest {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
}
+ @SuppressWarnings("deprecation")
+ @Test
+ public void shouldReturnBlockedTimeWhenTaskProducers() {
+ properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
+ mockClientSupplier.setApplicationIdForProducer("appId");
+ createTasks();
+ double total = 0.0;
+ double blocked = 1.0;
+ for (final MockProducer<?, ?> producer : mockClientSupplier.producers)
{
+ addMetric(producer, "flush-time-ns-total", blocked);
+ total += blocked;
+ blocked += 1.0;
+ }
+
+ assertThat(activeTaskCreator.totalProducerBlockedTime(),
closeTo(total, 0.01));
+ }
+
// error handling
@SuppressWarnings("deprecation")
@@ -289,7 +318,6 @@ public class ActiveTaskCreatorTest {
}
-
// eos-v2 test
// functional test
@@ -488,4 +516,26 @@ public class ActiveTaskCreatorTest {
equalTo(mkSet(task00, task01))
);
}
+
+ private void addMetric(
+ final MockProducer<?, ?> producer,
+ final String name,
+ final double value) {
+ final MetricName metricName = metricName(name);
+ producer.setMockMetrics(metricName, new Metric() {
+ @Override
+ public MetricName metricName() {
+ return metricName;
+ }
+
+ @Override
+ public Object metricValue() {
+ return value;
+ }
+ });
+ }
+
+ private MetricName metricName(final String name) {
+ return new MetricName(name, "", "", Collections.emptyMap());
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 5a83c68..48364f2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import
org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
@@ -133,7 +134,8 @@ public class RecordCollectorTest {
clientSupplier,
null,
processId,
- logContext
+ logContext,
+ Time.SYSTEM
);
mockProducer = clientSupplier.producers.get(0);
collector = new RecordCollectorImpl(
@@ -792,7 +794,8 @@ public class RecordCollectorTest {
},
taskId,
processId,
- logContext
+ logContext,
+ Time.SYSTEM
),
productionExceptionHandler,
streamsMetrics
@@ -823,7 +826,8 @@ public class RecordCollectorTest {
},
null,
null,
- logContext
+ logContext,
+ Time.SYSTEM
),
productionExceptionHandler,
streamsMetrics
@@ -857,7 +861,8 @@ public class RecordCollectorTest {
},
taskId,
processId,
- logContext
+ logContext,
+ Time.SYSTEM
),
productionExceptionHandler,
streamsMetrics
@@ -895,7 +900,8 @@ public class RecordCollectorTest {
},
null,
null,
- logContext
+ logContext,
+ Time.SYSTEM
);
}
@@ -916,7 +922,8 @@ public class RecordCollectorTest {
},
null,
null,
- logContext
+ logContext,
+ Time.SYSTEM
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTimeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTimeTest.java
new file mode 100644
index 0000000..c2f3f39
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTotalBlockedTimeTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+public class StreamThreadTotalBlockedTimeTest {
+ private static final int IOTIME_TOTAL = 1;
+ private static final int IO_WATTIME_TOTAL = 2;
+ private static final int COMMITTED_TIME_TOTAL = 3;
+ private static final int COMMIT_SYNC_TIME_TOTAL = 4;
+ private static final int RESTORE_IOTIME_TOTAL = 5;
+ private static final int RESTORE_IO_WAITTIME_TOTAL = 6;
+ private static final double PRODUCER_BLOCKED_TIME = 7.0;
+
+ @Mock
+ Consumer<?, ?> consumer;
+ @Mock
+ Consumer<?, ?> restoreConsumer;
+ @Mock
+ Supplier<Double> producerBlocked;
+
+ private StreamThreadTotalBlockedTime blockedTime;
+
+ @Rule
+ public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Before
+ public void setup() {
+ blockedTime = new StreamThreadTotalBlockedTime(consumer,
restoreConsumer, producerBlocked);
+ when(consumer.metrics()).thenAnswer(a -> new MetricsBuilder()
+ .addMetric("iotime-total", IOTIME_TOTAL)
+ .addMetric("io-waittime-total", IO_WATTIME_TOTAL)
+ .addMetric("committed-time-ns-total", COMMITTED_TIME_TOTAL)
+ .addMetric("commit-sync-time-ns-total", COMMIT_SYNC_TIME_TOTAL)
+ .build()
+ );
+ when(restoreConsumer.metrics()).thenAnswer(a -> new MetricsBuilder()
+ .addMetric("iotime-total", RESTORE_IOTIME_TOTAL)
+ .addMetric("io-waittime-total", RESTORE_IO_WAITTIME_TOTAL)
+ .build()
+ );
+ when(producerBlocked.get()).thenReturn(PRODUCER_BLOCKED_TIME);
+ }
+
+ @Test
+ public void shouldComputeTotalBlockedTime() {
+ assertThat(
+ blockedTime.compute(),
+ equalTo(IOTIME_TOTAL + IO_WATTIME_TOTAL + COMMITTED_TIME_TOTAL
+ + COMMIT_SYNC_TIME_TOTAL + RESTORE_IOTIME_TOTAL +
RESTORE_IO_WAITTIME_TOTAL
+ + PRODUCER_BLOCKED_TIME)
+ );
+ }
+
+ private static class MetricsBuilder {
+ private final HashMap<MetricName, Metric> metrics = new HashMap<>();
+
+ private MetricsBuilder addMetric(final String name, final double
value) {
+ final MetricName metricName = new MetricName(name, "", "",
Collections.emptyMap());
+ metrics.put(
+ metricName,
+ new Metric() {
+ @Override
+ public MetricName metricName() {
+ return metricName;
+ }
+
+ @Override
+ public Object metricValue() {
+ return value;
+ }
+ }
+ );
+ return this;
+ }
+
+ public Map<MetricName, ? extends Metric> build() {
+ return Collections.unmodifiableMap(metrics);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index f4dec89..5e074bf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -33,6 +35,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
@@ -57,6 +60,8 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertSame;
@@ -64,6 +69,13 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class StreamsProducerTest {
+ private static final double BUFFER_POOL_WAIT_TIME = 1;
+ private static final double FLUSH_TME = 2;
+ private static final double TXN_INIT_TIME = 3;
+ private static final double TXN_BEGIN_TIME = 4;
+ private static final double TXN_SEND_OFFSETS_TIME = 5;
+ private static final double TXN_COMMIT_TIME = 6;
+ private static final double TXN_ABORT_TIME = 7;
private final LogContext logContext = new LogContext("test ");
private final String topic = "topic";
@@ -93,6 +105,8 @@ public class StreamsProducerTest {
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2))
);
+ private final Time mockTime = mock(Time.class);
+
final Producer<byte[], byte[]> mockedProducer = mock(Producer.class);
final KafkaClientSupplier clientSupplier = new MockClientSupplier() {
@Override
@@ -106,7 +120,8 @@ public class StreamsProducerTest {
clientSupplier,
null,
null,
- logContext
+ logContext,
+ mockTime
);
final StreamsProducer eosAlphaStreamsProducerWithMock = new
StreamsProducer(
eosAlphaConfig,
@@ -114,7 +129,8 @@ public class StreamsProducerTest {
clientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
private final MockClientSupplier mockClientSupplier = new
MockClientSupplier();
@@ -136,8 +152,6 @@ public class StreamsProducerTest {
mkEntry(new TopicPartition(topic, 0), new OffsetAndMetadata(0L, null))
);
-
-
@Before
public void before() {
mockClientSupplier.setCluster(cluster);
@@ -148,7 +162,8 @@ public class StreamsProducerTest {
mockClientSupplier,
null,
null,
- logContext
+ logContext,
+ mockTime
);
nonEosMockProducer = mockClientSupplier.producers.get(0);
@@ -161,7 +176,8 @@ public class StreamsProducerTest {
eosAlphaMockClientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
eosAlphaStreamsProducer.initTransaction();
eosAlphaMockProducer = eosAlphaMockClientSupplier.producers.get(0);
@@ -175,10 +191,13 @@ public class StreamsProducerTest {
eosBetaMockClientSupplier,
null,
UUID.randomUUID(),
- logContext
+ logContext,
+ mockTime
);
eosBetaStreamsProducer.initTransaction();
eosBetaMockProducer = eosBetaMockClientSupplier.producers.get(0);
+
expect(mockTime.nanoseconds()).andAnswer(Time.SYSTEM::nanoseconds).anyTimes();
+ replay(mockTime);
}
@@ -251,7 +270,8 @@ public class StreamsProducerTest {
mockClientSupplier,
new TaskId(0, 0),
UUID.randomUUID(),
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("config cannot be null"));
@@ -267,7 +287,8 @@ public class StreamsProducerTest {
mockClientSupplier,
new TaskId(0, 0),
UUID.randomUUID(),
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("threadId cannot be null"));
@@ -283,7 +304,8 @@ public class StreamsProducerTest {
null,
new TaskId(0, 0),
UUID.randomUUID(),
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("clientSupplier cannot be null"));
@@ -299,7 +321,8 @@ public class StreamsProducerTest {
mockClientSupplier,
new TaskId(0, 0),
UUID.randomUUID(),
- null)
+ null,
+ mockTime)
);
assertThat(thrown.getMessage(), is("logContext cannot be null"));
@@ -343,7 +366,8 @@ public class StreamsProducerTest {
mockClientSupplier,
null,
null,
- logContext
+ logContext,
+ mockTime
);
}
@@ -462,7 +486,8 @@ public class StreamsProducerTest {
eosAlphaMockClientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
verify(mockMap);
@@ -489,7 +514,8 @@ public class StreamsProducerTest {
eosAlphaMockClientSupplier,
null,
processId,
- logContext
+ logContext,
+ mockTime
);
verify(mockMap);
@@ -612,7 +638,8 @@ public class StreamsProducerTest {
clientSupplier,
null,
UUID.randomUUID(),
- logContext
+ logContext,
+ mockTime
);
streamsProducer.initTransaction();
// call `send()` to start a transaction
@@ -665,7 +692,8 @@ public class StreamsProducerTest {
mockClientSupplier,
null,
UUID.randomUUID(),
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("taskId cannot be null for
exactly-once alpha"));
@@ -681,7 +709,8 @@ public class StreamsProducerTest {
mockClientSupplier,
new TaskId(0, 0),
null,
- logContext)
+ logContext,
+ mockTime)
);
assertThat(thrown.getMessage(), is("processId cannot be null for
exactly-once v2"));
@@ -704,7 +733,8 @@ public class StreamsProducerTest {
clientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
final TimeoutException thrown = assertThrows(
@@ -724,7 +754,8 @@ public class StreamsProducerTest {
eosAlphaMockClientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
final IllegalStateException thrown = assertThrows(
@@ -744,7 +775,8 @@ public class StreamsProducerTest {
eosBetaMockClientSupplier,
null,
UUID.randomUUID(),
- logContext
+ logContext,
+ mockTime
);
final IllegalStateException thrown = assertThrows(
@@ -772,7 +804,8 @@ public class StreamsProducerTest {
clientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
final StreamsException thrown = assertThrows(
@@ -801,7 +834,8 @@ public class StreamsProducerTest {
clientSupplier,
new TaskId(0, 0),
null,
- logContext
+ logContext,
+ mockTime
);
final RuntimeException thrown = assertThrows(
@@ -1105,7 +1139,8 @@ public class StreamsProducerTest {
clientSupplier,
null,
UUID.randomUUID(),
- logContext
+ logContext,
+ mockTime
);
streamsProducer.initTransaction();
@@ -1113,6 +1148,7 @@ public class StreamsProducerTest {
mockedProducer.close();
mockedProducer.initTransactions();
expectLastCall();
+
expect(mockedProducer.metrics()).andReturn(Collections.emptyMap()).anyTimes();
replay(mockedProducer);
streamsProducer.resetProducer();
@@ -1121,4 +1157,99 @@ public class StreamsProducerTest {
verify(mockedProducer);
}
+ @Test
+ public void shouldComputeTotalBlockedTime() {
+ setProducerMetrics(
+ nonEosMockProducer,
+ BUFFER_POOL_WAIT_TIME,
+ FLUSH_TME,
+ TXN_INIT_TIME,
+ TXN_BEGIN_TIME,
+ TXN_SEND_OFFSETS_TIME,
+ TXN_COMMIT_TIME,
+ TXN_ABORT_TIME
+ );
+
+ final double expectedTotalBlocked = BUFFER_POOL_WAIT_TIME + FLUSH_TME
+ TXN_INIT_TIME +
+ TXN_BEGIN_TIME + TXN_SEND_OFFSETS_TIME + TXN_COMMIT_TIME +
TXN_ABORT_TIME;
+ assertThat(nonEosStreamsProducer.totalBlockedTime(),
closeTo(expectedTotalBlocked, 0.01));
+ }
+
+ @Test
+ public void shouldComputeTotalBlockedTimeAfterReset() {
+ setProducerMetrics(
+ eosBetaMockProducer,
+ BUFFER_POOL_WAIT_TIME,
+ FLUSH_TME,
+ TXN_INIT_TIME,
+ TXN_BEGIN_TIME,
+ TXN_SEND_OFFSETS_TIME,
+ TXN_COMMIT_TIME,
+ TXN_ABORT_TIME
+ );
+ final double expectedTotalBlocked = BUFFER_POOL_WAIT_TIME + FLUSH_TME
+ TXN_INIT_TIME +
+ TXN_BEGIN_TIME + TXN_SEND_OFFSETS_TIME + TXN_COMMIT_TIME +
TXN_ABORT_TIME;
+ assertThat(eosBetaStreamsProducer.totalBlockedTime(),
equalTo(expectedTotalBlocked));
+ reset(mockTime);
+ final long closeStart = 1L;
+ final long clodeDelay = 1L;
+
expect(mockTime.nanoseconds()).andReturn(closeStart).andReturn(closeStart +
clodeDelay);
+ replay(mockTime);
+ eosBetaStreamsProducer.resetProducer();
+ setProducerMetrics(
+ eosBetaMockClientSupplier.producers.get(1),
+ BUFFER_POOL_WAIT_TIME,
+ FLUSH_TME,
+ TXN_INIT_TIME,
+ TXN_BEGIN_TIME,
+ TXN_SEND_OFFSETS_TIME,
+ TXN_COMMIT_TIME,
+ TXN_ABORT_TIME
+ );
+
+ assertThat(
+ eosBetaStreamsProducer.totalBlockedTime(),
+ closeTo(2 * expectedTotalBlocked + clodeDelay, 0.01)
+ );
+ }
+
+ private MetricName metricName(final String name) {
+ return new MetricName(name, "", "", Collections.emptyMap());
+ }
+
+ private void addMetric(
+ final MockProducer<?, ?> producer,
+ final String name,
+ final double value) {
+ final MetricName metricName = metricName(name);
+ producer.setMockMetrics(metricName, new Metric() {
+ @Override
+ public MetricName metricName() {
+ return metricName;
+ }
+
+ @Override
+ public Object metricValue() {
+ return value;
+ }
+ });
+ }
+
+ private void setProducerMetrics(
+ final MockProducer<?, ?> producer,
+ final double bufferPoolWaitTime,
+ final double flushTime,
+ final double txnInitTime,
+ final double txnBeginTime,
+ final double txnSendOffsetsTime,
+ final double txnCommitTime,
+ final double txnAbortTime) {
+ addMetric(producer, "bufferpool-wait-time-total", bufferPoolWaitTime);
+ addMetric(producer, "flush-time-ns-total", flushTime);
+ addMetric(producer, "txn-init-time-ns-total", txnInitTime);
+ addMetric(producer, "txn-begin-time-ns-total", txnBeginTime);
+ addMetric(producer, "txn-send-offsets-time-ns-total",
txnSendOffsetsTime);
+ addMetric(producer, "txn-commit-time-ns-total", txnCommitTime);
+ addMetric(producer, "txn-abort-time-ns-total", txnAbortTime);
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index bfe05a6..24cf8c7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -57,6 +57,7 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
@@ -75,6 +76,7 @@ import static org.hamcrest.CoreMatchers.equalToObject;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@@ -1209,4 +1211,92 @@ public class StreamsMetricsImplTest {
verify(sensor);
}
+
+ @Test
+ public void shouldAddThreadLevelMutableMetric() {
+ final int measuredValue = 123;
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+ streamsMetrics.addThreadLevelMutableMetric(
+ "foobar",
+ "test metric",
+ "t1",
+ (c, t) -> measuredValue
+ );
+
+ final MetricName name = metrics.metricName(
+ "foobar",
+ THREAD_LEVEL_GROUP,
+ Collections.singletonMap("thread-id", "t1")
+ );
+ assertThat(metrics.metric(name), notNullValue());
+ assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+ }
+
+ @Test
+ public void shouldCleanupThreadLevelMutableMetric() {
+ final int measuredValue = 123;
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+ streamsMetrics.addThreadLevelMutableMetric(
+ "foobar",
+ "test metric",
+ "t1",
+ (c, t) -> measuredValue
+ );
+
+ streamsMetrics.removeAllThreadLevelMetrics("t1");
+
+ final MetricName name = metrics.metricName(
+ "foobar",
+ THREAD_LEVEL_GROUP,
+ Collections.singletonMap("thread-id", "t1")
+ );
+ assertThat(metrics.metric(name), nullValue());
+ }
+
+ @Test
+ public void shouldAddThreadLevelImmutableMetric() {
+ final int measuredValue = 123;
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+
+ streamsMetrics.addThreadLevelImmutableMetric(
+ "foobar",
+ "test metric",
+ "t1",
+ measuredValue
+ );
+
+ final MetricName name = metrics.metricName(
+ "foobar",
+ THREAD_LEVEL_GROUP,
+ Collections.singletonMap("thread-id", "t1")
+ );
+ assertThat(metrics.metric(name), notNullValue());
+ assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+ }
+
+ @Test
+ public void shouldCleanupThreadLevelImmutableMetric() {
+ final int measuredValue = 123;
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, time);
+ streamsMetrics.addThreadLevelImmutableMetric(
+ "foobar",
+ "test metric",
+ "t1",
+ measuredValue
+ );
+
+ streamsMetrics.removeAllThreadLevelMetrics("t1");
+
+ final MetricName name = metrics.metricName(
+ "foobar",
+ THREAD_LEVEL_GROUP,
+ Collections.singletonMap("thread-id", "t1")
+ );
+ assertThat(metrics.metric(name), nullValue());
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index ae0eae4..0a486db 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -16,14 +16,20 @@
*/
package org.apache.kafka.streams.processor.internals.metrics;
+import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import
org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Map;
+import org.mockito.ArgumentCaptor;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
@@ -387,4 +393,55 @@ public class ThreadMetricsTest {
assertThat(sensor, is(expectedSensor));
}
+
+ @Test
+ public void shouldAddThreadStartTimeMetric() {
+ // Given:
+ final long startTime = 123L;
+
+ // When:
+ ThreadMetrics.addThreadStartTimeMetric(
+ "bongo",
+ streamsMetrics,
+ startTime
+ );
+
+ // Then:
+ verify(streamsMetrics).addThreadLevelImmutableMetric(
+ "thread-start-time",
+ "The time that the thread was started",
+ "bongo",
+ startTime
+ );
+ }
+
+ @Test
+ public void shouldAddTotalBlockedTimeMetric() {
+ // Given:
+ final double startTime = 123.45;
+ final StreamThreadTotalBlockedTime blockedTime =
mock(StreamThreadTotalBlockedTime.class);
+ when(blockedTime.compute()).thenReturn(startTime);
+
+ // When:
+ ThreadMetrics.addThreadBlockedTimeMetric(
+ "burger",
+ blockedTime,
+ streamsMetrics
+ );
+
+ // Then:
+ final ArgumentCaptor<Gauge<Double>> captor = gaugeCaptor();
+ verify(streamsMetrics).addThreadLevelMutableMetric(
+ eq("blocked-time-ns-total"),
+ eq("The total time the thread spent blocked on kafka"),
+ eq("burger"),
+ captor.capture()
+ );
+ assertThat(captor.getValue().value(null, 678L), is(startTime));
+ }
+
+ @SuppressWarnings("unchecked")
+ private ArgumentCaptor<Gauge<Double>> gaugeCaptor() {
+ return ArgumentCaptor.forClass(Gauge.class);
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 3f438f9..6a95ccb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
@@ -208,7 +209,8 @@ public class KeyValueStoreTestDriver<K, V> {
new MockClientSupplier(),
null,
null,
- logContext),
+ logContext,
+ Time.SYSTEM),
new DefaultProductionExceptionHandler(),
new MockStreamsMetrics(new Metrics())
) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c8a320f..c56f7bc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
@@ -423,7 +424,8 @@ public class StreamThreadStateStoreProviderTest {
clientSupplier,
new TaskId(0, 0),
UUID.randomUUID(),
- logContext
+ logContext,
+ Time.SYSTEM
),
streamsConfig.defaultProductionExceptionHandler(),
new MockStreamsMetrics(metrics));
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index c73ab3e..05f10e9 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -362,7 +362,8 @@ public class TopologyTestDriver implements Closeable {
throw new IllegalStateException();
}
},
- logContext
+ logContext,
+ mockWallClockTime
);
setupGlobalTask(mockWallClockTime, streamsConfig, streamsMetrics,
cache);
@@ -1334,8 +1335,9 @@ public class TopologyTestDriver implements Closeable {
public TestDriverProducer(final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
- final LogContext logContext) {
- super(config, "TopologyTestDriver-StreamThread-1", clientSupplier,
new TaskId(0, 0), UUID.randomUUID(), logContext);
+ final LogContext logContext,
+ final Time time) {
+ super(config, "TopologyTestDriver-StreamThread-1", clientSupplier,
new TaskId(0, 0), UUID.randomUUID(), logContext, time);
}
@Override