cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966762475


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final 
Set<TopicPartition> remain
             for (final Task restoringTask : stateUpdater.getTasks()) {
                 if (restoringTask.isActive()) {
                     if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        tasks.addPendingTaskToCloseClean(restoringTask.id());
+                        
tasks.addPendingActiveTaskToSuspend(restoringTask.id());
                         stateUpdater.remove(restoringTask.id());
                         
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());

Review Comment:
   No, an input partition can only be assigned to one consumer in a consumer 
group, because Kafka only allows to commit one offset per input partition. If 
an input partition were shared by two consumers would commit two potentially 
different offsets for the same input partition. So one consumer would overwrite 
the offset of the other. In case of fail-over, one consumer would read an 
offset that it did not commit before the fail-over and  restart processing at 
the wrong offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to