This is an automated email from the ASF dual-hosted git repository.

viktor pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new fe48e338fa1 KAFKA-14978 ExactlyOnceWorkerSourceTask should remove 
parent metrics (#13690)
fe48e338fa1 is described below

commit fe48e338fa17ee240575d66381415266154de33c
Author: Viktor Somogyi-Vass <[email protected]>
AuthorDate: Fri May 19 10:07:03 2023 +0200

    KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics 
(#13690)
    
    Reviewers: Chris Egerton <[email protected]>, Viktor Somogyi-Vass 
<[email protected]>
    
    Co-authored-by: Dániel Urbán <[email protected]>
---
 .../connect/runtime/ExactlyOnceWorkerSourceTask.java  |  1 +
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java      | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 58efaadc47c..cd905a355b6 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -231,6 +231,7 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
     @Override
     public void removeMetrics() {
         Utils.closeQuietly(transactionMetrics, "source task transaction 
metrics tracker");
+        super.removeMetrics();
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 35499a8c55c..70a0c246b85 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -86,6 +87,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptySet;
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -245,6 +247,23 @@ public class ExactlyOnceWorkerSourceTaskTest extends 
ThreadedTest {
                 sourceConfig, Runnable::run, preProducerCheck, 
postProducerCheck);
     }
 
+    @Test
+    public void testRemoveMetrics() {
+        createWorkerTask();
+
+        workerTask.removeMetrics();
+
+        assertEquals(emptySet(), 
filterToTaskMetrics(metrics.metrics().metrics().keySet()));
+    }
+
+    private Set<MetricName> filterToTaskMetrics(Set<MetricName> metricNames) {
+        return metricNames
+                .stream()
+                .filter(m -> 
metrics.registry().taskGroupName().equals(m.group())
+                        || 
metrics.registry().sourceTaskGroupName().equals(m.group()))
+                .collect(Collectors.toSet());
+    }
+
     @Test
     public void testStartPaused() throws Exception {
         final CountDownLatch pauseLatch = new CountDownLatch(1);

Reply via email to