This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ca05da95c33 KAFKA-15680: Fix sink task partition-count metric when cooperative consumer protocol is used (#14630) ca05da95c33 is described below commit ca05da95c337f051e721464588d1b6678e8f2afb Author: kumarpritam863 <148938310+kumarpritam...@users.noreply.github.com> AuthorDate: Mon Nov 6 22:02:05 2023 +0530 KAFKA-15680: Fix sink task partition-count metric when cooperative consumer protocol is used (#14630) Reviewers: Chris Egerton <chr...@aiven.io> --- .../kafka/clients/consumer/MockConsumer.java | 6 ++-- .../kafka/clients/consumer/MockConsumerTest.java | 3 ++ .../kafka/connect/runtime/WorkerSinkTask.java | 18 +++++++---- .../connect/runtime/WorkerSinkTaskContext.java | 6 ++-- .../kafka/connect/runtime/WorkerSinkTaskTest.java | 36 +++++++++++++++++++--- 5 files changed, 52 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 53c13e4b98b..126d5eff02a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -104,15 +104,13 @@ public class MockConsumer<K, V> implements Consumer<K, V> { // rebalance this.records.clear(); - this.subscriptions.assignFromSubscribed(newAssignment); // rebalance callbacks - if (!added.isEmpty()) { - this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsAssigned(added)); - } if (!removed.isEmpty()) { this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsRevoked(removed)); } + this.subscriptions.assignFromSubscribed(newAssignment); + this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsAssigned(added)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 17247ba79b1..c03be92fb9a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -154,6 +154,9 @@ public class MockConsumerTest { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + if (partitions.isEmpty()) { + return; + } assigned.clear(); assigned.addAll(partitions); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 4d400260081..0f2e8f6eb27 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -19,9 +19,9 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -85,7 +85,7 @@ class WorkerSinkTask extends WorkerTask { private final TransformationChain<SinkRecord> transformationChain; private final SinkTaskMetricsGroup sinkTaskMetricsGroup; private final boolean isTopicTrackingEnabled; - private final KafkaConsumer<byte[], byte[]> consumer; + private final Consumer<byte[], byte[]> consumer; private WorkerSinkTaskContext context; private final List<SinkRecord> messageBatch; private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets; @@ -114,7 +114,7 @@ class WorkerSinkTask extends WorkerTask { ErrorHandlingMetrics errorMetrics, HeaderConverter headerConverter, TransformationChain<SinkRecord> transformationChain, - KafkaConsumer<byte[], byte[]> consumer, + Consumer<byte[], byte[]> consumer, ClassLoader loader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, @@ -184,6 +184,14 @@ class WorkerSinkTask extends WorkerTask { Utils.closeQuietly(transformationChain, "transformation chain"); Utils.closeQuietly(retryWithToleranceOperator, "retry operator"); Utils.closeQuietly(headerConverter, "header converter"); + /* + Setting partition count explicitly to 0 to handle the case, + when the task fails, which would cause its consumer to leave the group. + This would cause onPartitionsRevoked to be invoked in the rebalance listener, but not onPartitionsAssigned, + so the metrics for the task (which are still available for failed tasks until they are explicitly revoked + from the worker) would become inaccurate. + */ + sinkTaskMetricsGroup.recordPartitionCount(0); } @Override @@ -646,7 +654,6 @@ class WorkerSinkTask extends WorkerTask { } private void openPartitions(Collection<TopicPartition> partitions) { - updatePartitionCount(); task.open(partitions); } @@ -668,7 +675,6 @@ class WorkerSinkTask extends WorkerTask { origOffsets.keySet().removeAll(topicPartitions); currentOffsets.keySet().removeAll(topicPartitions); } - updatePartitionCount(); lastCommittedOffsets.keySet().removeAll(topicPartitions); } @@ -729,7 +735,7 @@ class WorkerSinkTask extends WorkerTask { else if (!context.pausedPartitions().isEmpty()) consumer.pause(context.pausedPartitions()); } - + updatePartitionCount(); if (partitions.isEmpty()) { return; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java index f242ef4fe5d..71837960926 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.connect.runtime; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.IllegalWorkerStateException; import org.apache.kafka.connect.storage.ClusterConfigState; @@ -36,14 +36,14 @@ public class WorkerSinkTaskContext implements SinkTaskContext { private final Logger log = LoggerFactory.getLogger(getClass()); private final Map<TopicPartition, Long> offsets; - private final KafkaConsumer<byte[], byte[]> consumer; + private final Consumer<byte[], byte[]> consumer; private final WorkerSinkTask sinkTask; private final ClusterConfigState configState; private final Set<TopicPartition> pausedPartitions; private long timeoutMs; private boolean commitRequested; - public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, + public WorkerSinkTaskContext(Consumer<byte[], byte[]> consumer, WorkerSinkTask sinkTask, ClusterConfigState configState) { this.offsets = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index c943e22948b..179a5c62aea 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; @@ -57,6 +58,7 @@ import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.IExpectationSetters; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -674,7 +676,7 @@ public class WorkerSinkTaskTest { rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION)); return ConsumerRecords.empty(); }); - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(4); + EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(3); sinkTask.close(Collections.singleton(TOPIC_PARTITION3)); EasyMock.expectLastCall(); EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); @@ -1381,7 +1383,7 @@ public class WorkerSinkTaskTest { expectTaskGetTopic(true); expectPollInitialAssignment(); - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2); + EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(1); // Put one message through the task to get some offsets to commit expectConsumerPoll(1); @@ -1536,7 +1538,7 @@ public class WorkerSinkTaskTest { EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(offsetTp1); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(offsetTp2); EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(offsetTp3); - EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(rebalancedPartitions)).times(6); + EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(rebalancedPartitions)).times(5); // onPartitionsAssigned - step 2 sinkTask.open(EasyMock.eq(rebalancedPartitions)); @@ -1999,6 +2001,32 @@ public class WorkerSinkTaskTest { PowerMock.verifyAll(); } + @Test + public void testPartitionCountInCaseOfPartitionRevocation() { + MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + // Setting up Worker Sink Task to check metrics + workerTask = new WorkerSinkTask( + taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics, + keyConverter, valueConverter, errorHandlingMetrics, headerConverter, + transformationChain, mockConsumer, pluginLoader, time, + RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore, Collections::emptyList); + mockConsumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() {{ + put(TOPIC_PARTITION, 0 * 1L); + put(TOPIC_PARTITION2, 0 * 1L); + }}); + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + // Initial Re-balance to assign INITIAL_ASSIGNMENT which is "TOPIC_PARTITION" and "TOPIC_PARTITION2" + mockConsumer.rebalance(INITIAL_ASSIGNMENT); + assertSinkMetricValue("partition-count", 2); + // Revoked "TOPIC_PARTITION" and second re-balance with "TOPIC_PARTITION2" + mockConsumer.rebalance(Collections.singleton(TOPIC_PARTITION2)); + assertSinkMetricValue("partition-count", 1); + // Closing the Worker Sink Task which will update the partition count as 0. + workerTask.close(); + assertSinkMetricValue("partition-count", 0); + } + private void expectInitializeTask() { consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); @@ -2047,7 +2075,7 @@ public class WorkerSinkTaskTest { sinkTask.open(INITIAL_ASSIGNMENT); EasyMock.expectLastCall().andThrow(e); - EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(3); + EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2); EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( () -> { rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);