This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new b76e4f1  KAFKA-13472: Correct last-committed offsets tracking for sink 
tasks after partial revocation (#11526)
b76e4f1 is described below

commit b76e4f16e3e825c5c6c800c6ab7cd3271e3caf41
Author: Chris Egerton <chr...@confluent.io>
AuthorDate: Mon Nov 29 14:26:08 2021 -0500

    KAFKA-13472: Correct last-committed offsets tracking for sink tasks after 
partial revocation (#11526)
    
    The `WorkerSinkTask.lastCommittedOffsets` field is now added to (via 
`Map::putAll`) after a successful offset commit, instead of being completely 
overwritten. In order to prevent this collection from growing indefinitely, 
elements are removed from it after topic partitions are revoked from the task's 
consumer.
    
    Two test cases are added to `WorkerSinkTaskTest`:
    
    - A basic test to verify the "rewind for redelivery" behavior when a task 
throws an exception from `SinkTask::preCommit`; surprisingly, no existing test 
cases appear to cover this scenario
    - A more sophisticated test to verify this same behavior, but with a few 
rounds of cooperative consumer rebalancing beforehand that expose a bug in the 
current logic for the `WorkerSinkTask` class
    
    The `VerifiableSinkTask` class is also updated to only flush the requested 
topic partitions in its `flush` method. This is technically unrelated to the 
issue addressed by this PR and can be moved to a separate PR if necessary; 
including it here as the original context for identifying this bug was 
debugging failed system tests and the logic in this part of the tests was 
originally suspected as a cause of the test failure.
    
    Reviewers: Konstantine Karantasis <k.karanta...@gmail.com>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      |  14 +--
 .../kafka/connect/tools/VerifiableSinkTask.java    |  33 ++++--
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 129 +++++++++++++++++++++
 3 files changed, 157 insertions(+), 19 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 3e1eda9..08b9707 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -274,8 +274,9 @@ class WorkerSinkTask extends WorkerTask {
                 log.debug("{} Finished offset commit successfully in {} ms for 
sequence number {}: {}",
                         this, durationMillis, seqno, committedOffsets);
                 if (committedOffsets != null) {
-                    log.debug("{} Setting last committed offsets to {}", this, 
committedOffsets);
-                    lastCommittedOffsets = committedOffsets;
+                    log.trace("{} Adding to last committed offsets: {}", this, 
committedOffsets);
+                    lastCommittedOffsets.putAll(committedOffsets);
+                    log.debug("{} Last committed offsets are now {}", this, 
committedOffsets);
                     
sinkTaskMetricsGroup.recordCommittedOffsets(committedOffsets);
                 }
                 commitFailures = 0;
@@ -671,9 +672,10 @@ class WorkerSinkTask extends WorkerTask {
                 workerErrantRecordReporter.cancelFutures(topicPartitions);
                 log.trace("Cancelled all reported errors for {}", 
topicPartitions);
             }
-            topicPartitions.forEach(currentOffsets::remove);
+            currentOffsets.keySet().removeAll(topicPartitions);
         }
         updatePartitionCount();
+        lastCommittedOffsets.keySet().removeAll(topicPartitions);
     }
 
     private void updatePartitionCount() {
@@ -912,10 +914,8 @@ class WorkerSinkTask extends WorkerTask {
         }
 
         void clearOffsets(Collection<TopicPartition> topicPartitions) {
-            topicPartitions.forEach(tp -> {
-                consumedOffsets.remove(tp);
-                committedOffsets.remove(tp);
-            });
+            consumedOffsets.keySet().removeAll(topicPartitions);
+            committedOffsets.keySet().removeAll(topicPartitions);
             computeSinkRecordLag();
         }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
index ee58213..ff71ff8 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.sink.SinkTask;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -44,7 +45,7 @@ public class VerifiableSinkTask extends SinkTask {
     private String name; // Connector name
     private int id; // Task ID
 
-    private ArrayList<Map<String, Object>> unflushed = new ArrayList<>();
+    private final Map<TopicPartition, List<Map<String, Object>>> unflushed = 
new HashMap<>();
 
     @Override
     public String version() {
@@ -80,25 +81,33 @@ public class VerifiableSinkTask extends SinkTask {
                 dataJson = "Bad data can't be written as json: " + 
e.getMessage();
             }
             System.out.println(dataJson);
-            unflushed.add(data);
+            unflushed.computeIfAbsent(
+                    new TopicPartition(record.topic(), 
record.kafkaPartition()),
+                    tp -> new ArrayList<>()
+            ).add(data);
         }
     }
 
     @Override
     public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
         long nowMs = System.currentTimeMillis();
-        for (Map<String, Object> data : unflushed) {
-            data.put("time_ms", nowMs);
-            data.put("flushed", true);
-            String dataJson;
-            try {
-                dataJson = JSON_SERDE.writeValueAsString(data);
-            } catch (JsonProcessingException e) {
-                dataJson = "Bad data can't be written as json: " + 
e.getMessage();
+        for (TopicPartition topicPartition : offsets.keySet()) {
+            if (!unflushed.containsKey(topicPartition)) {
+                continue;
             }
-            System.out.println(dataJson);
+            for (Map<String, Object> data : unflushed.get(topicPartition)) {
+                data.put("time_ms", nowMs);
+                data.put("flushed", true);
+                String dataJson;
+                try {
+                    dataJson = JSON_SERDE.writeValueAsString(data);
+                } catch (JsonProcessingException e) {
+                    dataJson = "Bad data can't be written as json: " + 
e.getMessage();
+                }
+                System.out.println(dataJson);
+            }
+            unflushed.remove(topicPartition);
         }
-        unflushed.clear();
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 08a0458..1600dcf 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -661,6 +661,90 @@ public class WorkerSinkTaskTest {
     }
 
     @Test
+    public void testPreCommitFailureAfterPartialRevocationAndAssignment() 
throws Exception {
+        createTask(initialState);
+
+        // First poll; assignment is [TP1, TP2]
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+        expectPollInitialAssignment();
+
+        // Second poll; a single record is delivered from TP1
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        // Third poll; assignment changes to [TP2]
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
+                () -> {
+                    
rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
+                    
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
+                    return ConsumerRecords.empty();
+                });
+        
EasyMock.expect(consumer.assignment()).andReturn(Collections.singleton(TOPIC_PARTITION)).times(2);
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        sinkTask.preCommit(offsets);
+        EasyMock.expectLastCall().andReturn(offsets);
+        consumer.commitSync(offsets);
+        EasyMock.expectLastCall();
+        sinkTask.close(Collections.singleton(TOPIC_PARTITION));
+        EasyMock.expectLastCall();
+        sinkTask.put(Collections.emptyList());
+        EasyMock.expectLastCall();
+
+        // Fourth poll; assignment changes to [TP2, TP3]
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
+                () -> {
+                    
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
+                    
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
+                    return ConsumerRecords.empty();
+                });
+        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).times(2);
+        
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
+        sinkTask.open(Collections.singleton(TOPIC_PARTITION3));
+        EasyMock.expectLastCall();
+        sinkTask.put(Collections.emptyList());
+        EasyMock.expectLastCall();
+
+        // Fifth poll; an offset commit takes place
+        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).times(2);
+        final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = 
new HashMap<>();
+        workerCurrentOffsets.put(TOPIC_PARTITION2, new 
OffsetAndMetadata(FIRST_OFFSET));
+        workerCurrentOffsets.put(TOPIC_PARTITION3, new 
OffsetAndMetadata(FIRST_OFFSET));
+        sinkTask.preCommit(workerCurrentOffsets);
+        EasyMock.expectLastCall().andThrow(new ConnectException("Failed to 
flush"));
+
+        consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+        consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+
+        expectConsumerPoll(0);
+        sinkTask.put(EasyMock.eq(Collections.emptyList()));
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        // First iteration--first call to poll, first consumer assignment
+        workerTask.iteration();
+        // Second iteration--second call to poll, delivery of one record
+        workerTask.iteration();
+        // Third iteration--third call to poll, partial consumer revocation
+        workerTask.iteration();
+        // Fourth iteration--fourth call to poll, partial consumer assignment
+        workerTask.iteration();
+        // Fifth iteration--task-requested offset commit with failure in 
SinkTask::preCommit
+        sinkTaskContext.getValue().requestCommit();
+        workerTask.iteration();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testWakeupInCommitSyncCausesRetry() throws Exception {
         createTask(initialState);
 
@@ -971,6 +1055,51 @@ public class WorkerSinkTaskTest {
     }
 
     @Test
+    public void testPreCommitFailure() throws Exception {
+        createTask(initialState);
+
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+        
EasyMock.expect(consumer.assignment()).andStubReturn(INITIAL_ASSIGNMENT);
+
+        // iter 1
+        expectPollInitialAssignment();
+
+        // iter 2
+        expectConsumerPoll(2);
+        expectConversionAndTransformation(2);
+        sinkTask.put(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        // iter 3
+        final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = 
new HashMap<>();
+        workerCurrentOffsets.put(TOPIC_PARTITION, new 
OffsetAndMetadata(FIRST_OFFSET + 2));
+        workerCurrentOffsets.put(TOPIC_PARTITION2, new 
OffsetAndMetadata(FIRST_OFFSET));
+        sinkTask.preCommit(workerCurrentOffsets);
+        EasyMock.expectLastCall().andThrow(new ConnectException("Failed to 
flush"));
+
+        consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+        consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+
+        expectConsumerPoll(0);
+        sinkTask.put(EasyMock.eq(Collections.emptyList()));
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.iteration(); // iter 1 -- initial assignment
+        workerTask.iteration(); // iter 2 -- deliver 2 records
+        sinkTaskContext.getValue().requestCommit();
+        workerTask.iteration(); // iter 3 -- commit
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testIgnoredCommit() throws Exception {
         createTask(initialState);
 

Reply via email to