mjsax commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1986158617
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1163,10 +1166,12 @@ void handleRevocation(final Collection<TopicPartition>
revokedPartitions) {
// as such we just need to skip those dirty tasks in the checkpoint
final Set<Task> dirtyTasks = new HashSet<>();
try {
- // in handleRevocation we must call commitOffsetsOrTransaction()
directly rather than
- // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make
sure we don't skip the
- // offset commit because we are in a rebalance
- taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+ if (revokedTasksNeedCommit) {
Review Comment:
Not sure if I can follow?
Your proposal would say, we stop committing if a TX is in-flight, but we do
want to commit for this case, right? Even if offset-map is empty.
And moving it to the outer-most context seems not to be "correct", because
checking if a TX is inflight for the ALOS case seems unnecessary (guess it
would not be wrong, because the call would just always return `false` so not
really changing anything effectively, but it seems unnecessary to change the
code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]