vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414651483
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> allSourceTopics, allTasks, clientStates, numStandbyReplicas()); final TaskAssignor taskAssignor; - if (highAvailabilityEnabled) { - if (lagComputationSuccessful) { - taskAssignor = new HighAvailabilityTaskAssignor( - clientStates, - allTasks, - statefulTasks, - assignmentConfigs); - } else { - log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); - setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); - taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true); - } + if (!lagComputationSuccessful) { Review comment: This is a good thought. I think it mitigates the downside that we do still assign all the tasks when we fail to fetch lags, so it's not like we make no progress while waiting for the next rebalance. The "endless cycle" is a concern, but I'm not sure how it could happen in practice. I.e., what would make brokers consistently fail to report end offsets, but _not_ fail on any other APIs that Streams needs, especially since Streams needs to query the end-offset API during restoration anyway. It seems like the failure would either be transient or permanent(ish). If transient, then Streams will make progress during the probing.rebalance.interval, and succeed in balancing the assignment later. Even if we get further transient exceptions _during_ the sequence of HATA probing rebalances, the fact that we just return all tasks to their prior owners and that the HATA is stable mean that we just delay convergence by a single probing.rebalance.interval, not start all over again. If permanent, then Streams will fail anyway _after_ the assignment completes, since it also tends to query the end offsets immediately after getting the assignment. Even if it gets all prior tasks returned, which would make it skip the restoration phase, it seems implausible that we'd see a permanent failure on _only_ the end-offset API and Streams would happily be able to poll, commit, manage transactions, etc. Our big alternative is just to immediately raise the exception, and leave it to KIP-572 to deal with the situation holistically. But I'm concerned that the impact of bombing out of assignment is greater than that of handling other failures during processing. It seems like an exception in assignment dooms the current Join/SyncGroup phase for everyone, which means that they have to wait for a timeout and then redo the rebalance. So KIP-572 can still recover gracefully, by reconstructing the consumer, but it can't help the extra downtime of waiting for the failed rebalance to time out and trying again. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org