This is an automated email from the ASF dual-hosted git repository.
viktor pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new fe48e338fa1 KAFKA-14978 ExactlyOnceWorkerSourceTask should remove
parent metrics (#13690)
fe48e338fa1 is described below
commit fe48e338fa17ee240575d66381415266154de33c
Author: Viktor Somogyi-Vass <[email protected]>
AuthorDate: Fri May 19 10:07:03 2023 +0200
KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics
(#13690)
Reviewers: Chris Egerton <[email protected]>, Viktor Somogyi-Vass
<[email protected]>
Co-authored-by: Dániel Urbán <[email protected]>
---
.../connect/runtime/ExactlyOnceWorkerSourceTask.java | 1 +
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 19 +++++++++++++++++++
2 files changed, 20 insertions(+)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 58efaadc47c..cd905a355b6 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -231,6 +231,7 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
@Override
public void removeMetrics() {
Utils.closeQuietly(transactionMetrics, "source task transaction
metrics tracker");
+ super.removeMetrics();
}
@Override
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 35499a8c55c..70a0c246b85 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -86,6 +87,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static java.util.Collections.emptySet;
import static
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -245,6 +247,23 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
sourceConfig, Runnable::run, preProducerCheck,
postProducerCheck);
}
+ @Test
+ public void testRemoveMetrics() {
+ createWorkerTask();
+
+ workerTask.removeMetrics();
+
+ assertEquals(emptySet(),
filterToTaskMetrics(metrics.metrics().metrics().keySet()));
+ }
+
+ private Set<MetricName> filterToTaskMetrics(Set<MetricName> metricNames) {
+ return metricNames
+ .stream()
+ .filter(m ->
metrics.registry().taskGroupName().equals(m.group())
+ ||
metrics.registry().sourceTaskGroupName().equals(m.group()))
+ .collect(Collectors.toSet());
+ }
+
@Test
public void testStartPaused() throws Exception {
final CountDownLatch pauseLatch = new CountDownLatch(1);