guozhangwang commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r717027686



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -189,14 +189,13 @@ void handleCorruption(final Set<TaskId> corruptedTasks) {
 
         // We need to commit before closing the corrupted active tasks since 
this will force the ongoing txn to abort
         try {
-            commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks()

Review comment:
       No logical change here, just renamings.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -735,7 +735,6 @@ public void 
shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), 
anyString());
         expectLastCall().anyTimes();
         expectRestoreToBeCompleted(consumer, changeLogReader);
-        consumer.commitSync(eq(emptyMap()));

Review comment:
       These unit tests mocking are changed since originally we would send 
empty offsets.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1084,26 +1083,27 @@ int commit(final Collection<Task> tasksToCommit) {
      *                               or if the task producer got fenced (EOS)
      * @throws TimeoutException if committing offsets failed due to 
TimeoutException (non-EOS)
      * @throws TaskCorruptedException if committing offsets failed due to 
TimeoutException (EOS)
-     * @param consumedOffsetsAndMetadataPerTask an empty map that will be 
filled in with the prepared offsets
+     * @param consumedOffsetsAndMetadata an empty map that will be filled in 
with the prepared offsets
      * @return number of committed offsets, or -1 if we are in the middle of a 
rebalance and cannot commit
      */
-    private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final 
Collection<Task> tasksToCommit,
-                                                                    final 
Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask) {
+    private int commitTasksAndMaybeUpdateCommittableOffsets(final 
Collection<Task> tasksToCommit,
+                                                            final Map<Task, 
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
         if (rebalanceInProgress) {
             return -1;
         }
 
         int committed = 0;
         for (final Task task : tasksToCommit) {
+            // we need to call commitNeeded first since we need to update 
committable offsets
             if (task.commitNeeded()) {
                 final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata 
= task.prepareCommit();
-                if (task.isActive()) {

Review comment:
       We do not need to check on `task.isActive()` since for standby tasks, it 
would always return empty offset map. Instead we should just make sure that the 
map is not empty (for active tasks, they are possibly empty depending on their 
states) before adding to the map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to