vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414651483



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> 
allSourceTopics,
             allTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor;
-        if (highAvailabilityEnabled) {
-            if (lagComputationSuccessful) {
-                taskAssignor = new HighAvailabilityTaskAssignor(
-                    clientStates,
-                    allTasks,
-                    statefulTasks,
-                    assignmentConfigs);
-            } else {
-                log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
-                             + "trigger another rebalance to retry.");
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-                taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-            }
+        if (!lagComputationSuccessful) {

Review comment:
       This is a good thought. I think it mitigates the downside that we do 
still assign all the tasks when we fail to fetch lags, so it's not like we make 
no progress while waiting for the next rebalance. The "endless cycle" is a 
concern, but I'm not sure how it could happen in practice. I.e., what would 
make brokers consistently fail to report end offsets, but _not_ fail on any 
other APIs that Streams needs, especially since Streams needs to query the 
end-offset API during restoration anyway.
   
   It seems like the failure would either be transient or permanent(ish).
   
   If transient, then Streams will make progress during the 
probing.rebalance.interval, and succeed in balancing the assignment later. Even 
if we get further transient exceptions _during_ the sequence of HATA probing 
rebalances, the fact that we just return all tasks to their prior owners and 
that the HATA is stable mean that we just delay convergence by a single 
probing.rebalance.interval, not start all over again.
   
   If permanent, then Streams will fail anyway _after_ the assignment 
completes, since it also  tends to query the end offsets immediately after 
getting the assignment. Even if it gets all prior tasks returned, which would 
make it skip the restoration phase, it seems implausible that we'd see a 
permanent failure on _only_ the end-offset API and Streams would happily be 
able to poll, commit, manage transactions, etc.
   
   Our big alternative is just to immediately raise the exception, and leave it 
to KIP-572 to deal with the situation holistically. But I'm concerned that the 
impact of bombing out of assignment is greater than that of handling other 
failures during processing. It seems like an exception in assignment dooms the 
current Join/SyncGroup phase for everyone, which means that they have to wait 
for a timeout and then redo the rebalance. So KIP-572 can still recover 
gracefully, by reconstructing the consumer, but it can't help the extra 
downtime of waiting for the failed rebalance to time out and trying again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to