This is an automated email from the ASF dual-hosted git repository. kkarantasis pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push: new b76e4f1 KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation (#11526) b76e4f1 is described below commit b76e4f16e3e825c5c6c800c6ab7cd3271e3caf41 Author: Chris Egerton <chr...@confluent.io> AuthorDate: Mon Nov 29 14:26:08 2021 -0500 KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation (#11526) The `WorkerSinkTask.lastCommittedOffsets` field is now added to (via `Map::putAll`) after a successful offset commit, instead of being completely overwritten. In order to prevent this collection from growing indefinitely, elements are removed from it after topic partitions are revoked from the task's consumer. Two test cases are added to `WorkerSinkTaskTest`: - A basic test to verify the "rewind for redelivery" behavior when a task throws an exception from `SinkTask::preCommit`; surprisingly, no existing test cases appear to cover this scenario - A more sophisticated test to verify this same behavior, but with a few rounds of cooperative consumer rebalancing beforehand that expose a bug in the current logic for the `WorkerSinkTask` class The `VerifiableSinkTask` class is also updated to only flush the requested topic partitions in its `flush` method. This is technically unrelated to the issue addressed by this PR and can be moved to a separate PR if necessary; including it here as the original context for identifying this bug was debugging failed system tests and the logic in this part of the tests was originally suspected as a cause of the test failure. Reviewers: Konstantine Karantasis <k.karanta...@gmail.com> --- .../kafka/connect/runtime/WorkerSinkTask.java | 14 +-- .../kafka/connect/tools/VerifiableSinkTask.java | 33 ++++-- .../kafka/connect/runtime/WorkerSinkTaskTest.java | 129 +++++++++++++++++++++ 3 files changed, 157 insertions(+), 19 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 3e1eda9..08b9707 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -274,8 +274,9 @@ class WorkerSinkTask extends WorkerTask { log.debug("{} Finished offset commit successfully in {} ms for sequence number {}: {}", this, durationMillis, seqno, committedOffsets); if (committedOffsets != null) { - log.debug("{} Setting last committed offsets to {}", this, committedOffsets); - lastCommittedOffsets = committedOffsets; + log.trace("{} Adding to last committed offsets: {}", this, committedOffsets); + lastCommittedOffsets.putAll(committedOffsets); + log.debug("{} Last committed offsets are now {}", this, committedOffsets); sinkTaskMetricsGroup.recordCommittedOffsets(committedOffsets); } commitFailures = 0; @@ -671,9 +672,10 @@ class WorkerSinkTask extends WorkerTask { workerErrantRecordReporter.cancelFutures(topicPartitions); log.trace("Cancelled all reported errors for {}", topicPartitions); } - topicPartitions.forEach(currentOffsets::remove); + currentOffsets.keySet().removeAll(topicPartitions); } updatePartitionCount(); + lastCommittedOffsets.keySet().removeAll(topicPartitions); } private void updatePartitionCount() { @@ -912,10 +914,8 @@ class WorkerSinkTask extends WorkerTask { } void clearOffsets(Collection<TopicPartition> topicPartitions) { - topicPartitions.forEach(tp -> { - consumedOffsets.remove(tp); - committedOffsets.remove(tp); - }); + consumedOffsets.keySet().removeAll(topicPartitions); + committedOffsets.keySet().removeAll(topicPartitions); computeSinkRecordLag(); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java index ee58213..ff71ff8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.sink.SinkTask; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -44,7 +45,7 @@ public class VerifiableSinkTask extends SinkTask { private String name; // Connector name private int id; // Task ID - private ArrayList<Map<String, Object>> unflushed = new ArrayList<>(); + private final Map<TopicPartition, List<Map<String, Object>>> unflushed = new HashMap<>(); @Override public String version() { @@ -80,25 +81,33 @@ public class VerifiableSinkTask extends SinkTask { dataJson = "Bad data can't be written as json: " + e.getMessage(); } System.out.println(dataJson); - unflushed.add(data); + unflushed.computeIfAbsent( + new TopicPartition(record.topic(), record.kafkaPartition()), + tp -> new ArrayList<>() + ).add(data); } } @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { long nowMs = System.currentTimeMillis(); - for (Map<String, Object> data : unflushed) { - data.put("time_ms", nowMs); - data.put("flushed", true); - String dataJson; - try { - dataJson = JSON_SERDE.writeValueAsString(data); - } catch (JsonProcessingException e) { - dataJson = "Bad data can't be written as json: " + e.getMessage(); + for (TopicPartition topicPartition : offsets.keySet()) { + if (!unflushed.containsKey(topicPartition)) { + continue; } - System.out.println(dataJson); + for (Map<String, Object> data : unflushed.get(topicPartition)) { + data.put("time_ms", nowMs); + data.put("flushed", true); + String dataJson; + try { + dataJson = JSON_SERDE.writeValueAsString(data); + } catch (JsonProcessingException e) { + dataJson = "Bad data can't be written as json: " + e.getMessage(); + } + System.out.println(dataJson); + } + unflushed.remove(topicPartition); } - unflushed.clear(); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 08a0458..1600dcf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -661,6 +661,90 @@ public class WorkerSinkTaskTest { } @Test + public void testPreCommitFailureAfterPartialRevocationAndAssignment() throws Exception { + createTask(initialState); + + // First poll; assignment is [TP1, TP2] + expectInitializeTask(); + expectTaskGetTopic(true); + expectPollInitialAssignment(); + + // Second poll; a single record is delivered from TP1 + expectConsumerPoll(1); + expectConversionAndTransformation(1); + sinkTask.put(EasyMock.anyObject()); + EasyMock.expectLastCall(); + + // Third poll; assignment changes to [TP2] + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( + () -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION)); + rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet()); + return ConsumerRecords.empty(); + }); + EasyMock.expect(consumer.assignment()).andReturn(Collections.singleton(TOPIC_PARTITION)).times(2); + final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); + sinkTask.preCommit(offsets); + EasyMock.expectLastCall().andReturn(offsets); + consumer.commitSync(offsets); + EasyMock.expectLastCall(); + sinkTask.close(Collections.singleton(TOPIC_PARTITION)); + EasyMock.expectLastCall(); + sinkTask.put(Collections.emptyList()); + EasyMock.expectLastCall(); + + // Fourth poll; assignment changes to [TP2, TP3] + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( + () -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet()); + rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3)); + return ConsumerRecords.empty(); + }); + EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).times(2); + EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); + sinkTask.open(Collections.singleton(TOPIC_PARTITION3)); + EasyMock.expectLastCall(); + sinkTask.put(Collections.emptyList()); + EasyMock.expectLastCall(); + + // Fifth poll; an offset commit takes place + EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).times(2); + final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>(); + workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + workerCurrentOffsets.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET)); + sinkTask.preCommit(workerCurrentOffsets); + EasyMock.expectLastCall().andThrow(new ConnectException("Failed to flush")); + + consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET); + EasyMock.expectLastCall(); + consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); + EasyMock.expectLastCall(); + + expectConsumerPoll(0); + sinkTask.put(EasyMock.eq(Collections.emptyList())); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + // First iteration--first call to poll, first consumer assignment + workerTask.iteration(); + // Second iteration--second call to poll, delivery of one record + workerTask.iteration(); + // Third iteration--third call to poll, partial consumer revocation + workerTask.iteration(); + // Fourth iteration--fourth call to poll, partial consumer assignment + workerTask.iteration(); + // Fifth iteration--task-requested offset commit with failure in SinkTask::preCommit + sinkTaskContext.getValue().requestCommit(); + workerTask.iteration(); + + PowerMock.verifyAll(); + } + + @Test public void testWakeupInCommitSyncCausesRetry() throws Exception { createTask(initialState); @@ -971,6 +1055,51 @@ public class WorkerSinkTaskTest { } @Test + public void testPreCommitFailure() throws Exception { + createTask(initialState); + + expectInitializeTask(); + expectTaskGetTopic(true); + EasyMock.expect(consumer.assignment()).andStubReturn(INITIAL_ASSIGNMENT); + + // iter 1 + expectPollInitialAssignment(); + + // iter 2 + expectConsumerPoll(2); + expectConversionAndTransformation(2); + sinkTask.put(EasyMock.anyObject()); + EasyMock.expectLastCall(); + + // iter 3 + final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>(); + workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2)); + workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + sinkTask.preCommit(workerCurrentOffsets); + EasyMock.expectLastCall().andThrow(new ConnectException("Failed to flush")); + + consumer.seek(TOPIC_PARTITION, FIRST_OFFSET); + EasyMock.expectLastCall(); + consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET); + EasyMock.expectLastCall(); + + expectConsumerPoll(0); + sinkTask.put(EasyMock.eq(Collections.emptyList())); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + workerTask.iteration(); // iter 1 -- initial assignment + workerTask.iteration(); // iter 2 -- deliver 2 records + sinkTaskContext.getValue().requestCommit(); + workerTask.iteration(); // iter 3 -- commit + + PowerMock.verifyAll(); + } + + @Test public void testIgnoredCommit() throws Exception { createTask(initialState);