lucasbru commented on code in PR #20833:
URL: https://github.com/apache/kafka/pull/20833#discussion_r2509227799


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final 
int receivedAssignmentMet
 
     @Override
     public void onAssignment(final Assignment assignment, final 
ConsumerGroupMetadata metadata) {
+        final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()

Review Comment:
   Shouldn't this go into `TaskManager.handleAssignment`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final 
int receivedAssignmentMet
 
     @Override
     public void onAssignment(final Assignment assignment, final 
ConsumerGroupMetadata metadata) {
+        final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
+            .values()
+            .stream()
+            .filter(t -> t.commitNeeded())
+            .collect(Collectors.toSet());
+        log.info("Committing {} tasks with open transactions before 
onAssignment()", tasksWithOpenTransactions.size());
+        taskManager.commit(tasksWithOpenTransactions);

Review Comment:
   I think we throw from `onPartitionsRevoked` as well. But we indeed try to 
finish as much as possible of the revocation before we rethrow the first 
exception. For KIP-1071, `handleAssignment` is called from 
`onPartitionsAssigned` so it will work fairly similarly.
   
   Note also the comment below:
   
   ```
           // we do not capture any exceptions but just let the exception 
thrown from consumer.poll directly
           // since when stream thread captures it, either we close all tasks 
as dirty or we close thread
   ```
   
   That sounds like it should do the right thing already?
   
   But yes, we need to dig deep to make sure we do not break the error handling 
here.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final 
int receivedAssignmentMet
 
     @Override
     public void onAssignment(final Assignment assignment, final 
ConsumerGroupMetadata metadata) {
+        final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
+            .values()
+            .stream()
+            .filter(t -> t.commitNeeded())

Review Comment:
   I'm a bit confused. Will the commit actually do something here in the 
classic protocol? I think rebalanceInProgress will be true when `onAssignment` 
is called, so the commit will be skipped, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to