ableegoldman commented on a change in pull request #10444:
URL: https://github.com/apache/kafka/pull/10444#discussion_r604505465



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1013,28 +1016,34 @@ void addRecordsToTasks(final ConsumerRecords<byte[], 
byte[]> records) {
      */
     int commit(final Collection<Task> tasksToCommit) {
         int committed = 0;
-        if (rebalanceInProgress) {
-            committed = -1;
-        } else {
-            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            try {
-                committed = 
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, 
consumedOffsetsAndMetadataPerTask);
-            } catch (final TimeoutException timeoutException) {
-                consumedOffsetsAndMetadataPerTask
-                    .keySet()
-                    .forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
-            }
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
+        try {
+            committed = 
commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, 
consumedOffsetsAndMetadataPerTask);
+        } catch (final TimeoutException timeoutException) {
+            consumedOffsetsAndMetadataPerTask
+                .keySet()
+                .forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
         }
+
         return committed;
     }
 
     /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     * @throws TimeoutException if committing offsets failed due to 
TimeoutException (non-EOS)
+     * @throws TaskCorruptedException if committing offsets failed due to 
TimeoutException (EOS)
      * @param consumedOffsetsAndMetadataPerTask an empty map that will be 
filled in with the prepared offsets
+     * @return number of committed offsets, or -1 if we are in the middle of a 
rebalance and cannot commit
      */
     private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final 
Collection<Task> tasksToCommit,
                                                                     final 
Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask) {
-        int committed = 0;
+        if (rebalanceInProgress) {
+            return -1;
+        }

Review comment:
       This is the only logical change, I moved this check from 
`TaskManager#commit` to this method. And added a bunch of comments/docs so we 
don't forget this again




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to