ableegoldman commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602935453



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -972,42 +1004,60 @@ void addRecordsToTasks(final ConsumerRecords<byte[], 
byte[]> records) {
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
+     * @throws TimeoutException if task.timeout.ms has been exceeded (non-EOS)
+     * @throws TaskCorruptedException if committing offsets failed due to 
TimeoutException (EOS)
      * @return number of committed offsets, or -1 if we are in the middle of a 
rebalance and cannot commit
      */
     int commit(final Collection<Task> tasksToCommit) {
+        int committed = 0;
         if (rebalanceInProgress) {
-            return -1;
+            committed = -1;
         } else {
-            int committed = 0;
             final Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            for (final Task task : tasksToCommit) {
-                if (task.commitNeeded()) {
-                    final Map<TopicPartition, OffsetAndMetadata> 
offsetAndMetadata = task.prepareCommit();
-                    if (task.isActive()) {
-                        consumedOffsetsAndMetadataPerTask.put(task, 
offsetAndMetadata);
-                    }
-                }
-            }
-
             try {
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-
-                for (final Task task : tasksToCommit) {
-                    if (task.commitNeeded()) {
-                        task.clearTaskTimeout();
-                        ++committed;
-                        task.postCommit(false);
-                    }
-                }
+                committed = 
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, 
consumedOffsetsAndMetadataPerTask);
             } catch (final TimeoutException timeoutException) {
                 consumedOffsetsAndMetadataPerTask
                     .keySet()
                     .forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
             }
+        }
+        return committed;
+    }
+
+    /**
+     * Prepare, commit, and post-commit all tasks.
+     */
+    private void doCommit(final Collection<Task> tasksToCommit) {

Review comment:
       Fair enough




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to