ncliang commented on a change in pull request #10563:
URL: https://github.com/apache/kafka/pull/10563#discussion_r624411177



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -631,13 +648,31 @@ private void rewind() {
     }
 
     private void openPartitions(Collection<TopicPartition> partitions) {
-        sinkTaskMetricsGroup.recordPartitionCount(partitions.size());
+        updatePartitionCount();
         task.open(partitions);
     }
 
-    private void closePartitions() {
-        commitOffsets(time.milliseconds(), true);
-        sinkTaskMetricsGroup.recordPartitionCount(0);
+    private void closeAllPartitions() {
+        closePartitions(currentOffsets.keySet(), false);
+    }
+
+    private void closePartitions(Collection<TopicPartition> topicPartitions, 
boolean lost) {
+        if (!lost) {
+            commitOffsets(time.milliseconds(), true, topicPartitions);
+        } else {
+            log.trace("{} Closing the task as partitions have been lost: {}", 
this, topicPartitions);
+            task.close(topicPartitions);
+            if (workerErrantRecordReporter != null) {
+                log.trace("Cancelling reported errors for {}", 
topicPartitions);
+                workerErrantRecordReporter.cancelFutures(topicPartitions);

Review comment:
       I see. So the distinction is because in the revoked case we get a chance 
to await on the errant record reporter futures before we commit offsets for the 
revoked partitions, but in the lost case we've already lost the partition and 
don't get a chance to commit offsets. Since we already do not own the 
partition, we should not be reporting errors for it and should let the current 
owner take that responsibility. It should be noted that the cancelation is best 
effort, so there is a chance we duplicate reporting for the errant record.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to