This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ca05da95c33 KAFKA-15680: Fix sink task partition-count metric when 
cooperative consumer protocol is used (#14630)
ca05da95c33 is described below

commit ca05da95c337f051e721464588d1b6678e8f2afb
Author: kumarpritam863 <148938310+kumarpritam...@users.noreply.github.com>
AuthorDate: Mon Nov 6 22:02:05 2023 +0530

    KAFKA-15680: Fix sink task partition-count metric when cooperative consumer 
protocol is used (#14630)
    
    Reviewers: Chris Egerton <chr...@aiven.io>
---
 .../kafka/clients/consumer/MockConsumer.java       |  6 ++--
 .../kafka/clients/consumer/MockConsumerTest.java   |  3 ++
 .../kafka/connect/runtime/WorkerSinkTask.java      | 18 +++++++----
 .../connect/runtime/WorkerSinkTaskContext.java     |  6 ++--
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 36 +++++++++++++++++++---
 5 files changed, 52 insertions(+), 17 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 53c13e4b98b..126d5eff02a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -104,15 +104,13 @@ public class MockConsumer<K, V> implements Consumer<K, V> 
{
 
         // rebalance
         this.records.clear();
-        this.subscriptions.assignFromSubscribed(newAssignment);
 
         // rebalance callbacks
-        if (!added.isEmpty()) {
-            this.subscriptions.rebalanceListener().ifPresent(crl -> 
crl.onPartitionsAssigned(added));
-        }
         if (!removed.isEmpty()) {
             this.subscriptions.rebalanceListener().ifPresent(crl -> 
crl.onPartitionsRevoked(removed));
         }
+        this.subscriptions.assignFromSubscribed(newAssignment);
+        this.subscriptions.rebalanceListener().ifPresent(crl -> 
crl.onPartitionsAssigned(added));
     }
 
     @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 17247ba79b1..c03be92fb9a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -154,6 +154,9 @@ public class MockConsumerTest {
 
             @Override
             public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                if (partitions.isEmpty()) {
+                    return;
+                }
                 assigned.clear();
                 assigned.addAll(partitions);
             }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 4d400260081..0f2e8f6eb27 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -19,9 +19,9 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
@@ -85,7 +85,7 @@ class WorkerSinkTask extends WorkerTask {
     private final TransformationChain<SinkRecord> transformationChain;
     private final SinkTaskMetricsGroup sinkTaskMetricsGroup;
     private final boolean isTopicTrackingEnabled;
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final Consumer<byte[], byte[]> consumer;
     private WorkerSinkTaskContext context;
     private final List<SinkRecord> messageBatch;
     private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
@@ -114,7 +114,7 @@ class WorkerSinkTask extends WorkerTask {
                           ErrorHandlingMetrics errorMetrics,
                           HeaderConverter headerConverter,
                           TransformationChain<SinkRecord> transformationChain,
-                          KafkaConsumer<byte[], byte[]> consumer,
+                          Consumer<byte[], byte[]> consumer,
                           ClassLoader loader,
                           Time time,
                           RetryWithToleranceOperator 
retryWithToleranceOperator,
@@ -184,6 +184,14 @@ class WorkerSinkTask extends WorkerTask {
         Utils.closeQuietly(transformationChain, "transformation chain");
         Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
         Utils.closeQuietly(headerConverter, "header converter");
+        /*
+            Setting partition count explicitly to 0 to handle the case,
+            when the task fails, which would cause its consumer to leave the 
group.
+            This would cause onPartitionsRevoked to be invoked in the 
rebalance listener, but not onPartitionsAssigned,
+            so the metrics for the task (which are still available for failed 
tasks until they are explicitly revoked
+            from the worker) would become inaccurate.
+        */
+        sinkTaskMetricsGroup.recordPartitionCount(0);
     }
 
     @Override
@@ -646,7 +654,6 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     private void openPartitions(Collection<TopicPartition> partitions) {
-        updatePartitionCount();
         task.open(partitions);
     }
 
@@ -668,7 +675,6 @@ class WorkerSinkTask extends WorkerTask {
             origOffsets.keySet().removeAll(topicPartitions);
             currentOffsets.keySet().removeAll(topicPartitions);
         }
-        updatePartitionCount();
         lastCommittedOffsets.keySet().removeAll(topicPartitions);
     }
 
@@ -729,7 +735,7 @@ class WorkerSinkTask extends WorkerTask {
                 else if (!context.pausedPartitions().isEmpty())
                     consumer.pause(context.pausedPartitions());
             }
-
+            updatePartitionCount();
             if (partitions.isEmpty()) {
                 return;
             }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index f242ef4fe5d..71837960926 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
 import org.apache.kafka.connect.storage.ClusterConfigState;
@@ -36,14 +36,14 @@ public class WorkerSinkTaskContext implements 
SinkTaskContext {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final Map<TopicPartition, Long> offsets;
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final Consumer<byte[], byte[]> consumer;
     private final WorkerSinkTask sinkTask;
     private final ClusterConfigState configState;
     private final Set<TopicPartition> pausedPartitions;
     private long timeoutMs;
     private boolean commitRequested;
 
-    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer,
+    public WorkerSinkTaskContext(Consumer<byte[], byte[]> consumer,
                                  WorkerSinkTask sinkTask,
                                  ClusterConfigState configState) {
         this.offsets = new HashMap<>();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index c943e22948b..179a5c62aea 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
@@ -57,6 +58,7 @@ import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.IExpectationSetters;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -674,7 +676,7 @@ public class WorkerSinkTaskTest {
                 
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION));
                 return ConsumerRecords.empty();
             });
-        
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(4);
+        
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(3);
         sinkTask.close(Collections.singleton(TOPIC_PARTITION3));
         EasyMock.expectLastCall();
         
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
@@ -1381,7 +1383,7 @@ public class WorkerSinkTaskTest {
         expectTaskGetTopic(true);
 
         expectPollInitialAssignment();
-        
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
+        
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(1);
 
         // Put one message through the task to get some offsets to commit
         expectConsumerPoll(1);
@@ -1536,7 +1538,7 @@ public class WorkerSinkTaskTest {
         
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(offsetTp1);
         
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(offsetTp2);
         
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(offsetTp3);
-        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(rebalancedPartitions)).times(6);
+        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(rebalancedPartitions)).times(5);
 
         // onPartitionsAssigned - step 2
         sinkTask.open(EasyMock.eq(rebalancedPartitions));
@@ -1999,6 +2001,32 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPartitionCountInCaseOfPartitionRevocation() {
+        MockConsumer<byte[], byte[]> mockConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        // Setting up Worker Sink Task to check metrics
+        workerTask = new WorkerSinkTask(
+                taskId, sinkTask, statusListener, TargetState.PAUSED, 
workerConfig, ClusterConfigState.EMPTY, metrics,
+                keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
+                transformationChain, mockConsumer, pluginLoader, time,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, 
statusBackingStore, Collections::emptyList);
+        mockConsumer.updateBeginningOffsets(new HashMap<TopicPartition, 
Long>() {{
+                put(TOPIC_PARTITION, 0 * 1L);
+                put(TOPIC_PARTITION2, 0 * 1L);
+            }});
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        // Initial Re-balance to assign INITIAL_ASSIGNMENT which is 
"TOPIC_PARTITION" and "TOPIC_PARTITION2"
+        mockConsumer.rebalance(INITIAL_ASSIGNMENT);
+        assertSinkMetricValue("partition-count", 2);
+        // Revoked "TOPIC_PARTITION" and second re-balance with 
"TOPIC_PARTITION2"
+        mockConsumer.rebalance(Collections.singleton(TOPIC_PARTITION2));
+        assertSinkMetricValue("partition-count", 1);
+        // Closing the Worker Sink Task which will update the partition count 
as 0.
+        workerTask.close();
+        assertSinkMetricValue("partition-count", 0);
+    }
+
     private void expectInitializeTask() {
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), 
EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
@@ -2047,7 +2075,7 @@ public class WorkerSinkTaskTest {
         sinkTask.open(INITIAL_ASSIGNMENT);
         EasyMock.expectLastCall().andThrow(e);
 
-        
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(3);
+        
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
         
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
             () -> {
                 
rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);

Reply via email to