lucasbru commented on code in PR #20833:
URL: https://github.com/apache/kafka/pull/20833#discussion_r2509227799
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final
int receivedAssignmentMet
@Override
public void onAssignment(final Assignment assignment, final
ConsumerGroupMetadata metadata) {
+ final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
Review Comment:
Shouldn't this go into `TaskManager.handleAssignment`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final
int receivedAssignmentMet
@Override
public void onAssignment(final Assignment assignment, final
ConsumerGroupMetadata metadata) {
+ final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
+ .values()
+ .stream()
+ .filter(t -> t.commitNeeded())
+ .collect(Collectors.toSet());
+ log.info("Committing {} tasks with open transactions before
onAssignment()", tasksWithOpenTransactions.size());
+ taskManager.commit(tasksWithOpenTransactions);
Review Comment:
I think we throw from `onPartitionsRevoked` as well. But we indeed try to
finish as much as possible of the revocation before we rethrow the first
exception. For KIP-1071, `handleAssignment` is called from
`onPartitionsAssigned` so it will work fairly similarly.
Note also the comment below:
```
// we do not capture any exceptions but just let the exception
thrown from consumer.poll directly
// since when stream thread captures it, either we close all tasks
as dirty or we close thread
```
That sounds like it should do the right thing already?
But yes, we need to dig deep to make sure we do not break the error handling
here.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final
int receivedAssignmentMet
@Override
public void onAssignment(final Assignment assignment, final
ConsumerGroupMetadata metadata) {
+ final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
+ .values()
+ .stream()
+ .filter(t -> t.commitNeeded())
Review Comment:
I'm a bit confused. Will the commit actually do something here in the
classic protocol? I think rebalanceInProgress will be true when `onAssignment`
is called, so the commit will be skipped, no?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]