vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416283374
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String>
allSourceTopics,
allTasks, clientStates, numStandbyReplicas());
final TaskAssignor taskAssignor;
- if (highAvailabilityEnabled) {
- if (lagComputationSuccessful) {
- taskAssignor = new HighAvailabilityTaskAssignor(
- clientStates,
- allTasks,
- statefulTasks,
- assignmentConfigs);
- } else {
- log.info("Failed to fetch end offsets for changelogs, will
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
- setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
- taskAssignor = new StickyTaskAssignor(clientStates, allTasks,
statefulTasks, assignmentConfigs, true);
- }
+ if (!lagComputationSuccessful) {
Review comment:
Hah! You know I can't let that happen :)
I think the fallback assignor is special in that, if we are unable to
satisfy the TaskAssignor interface (because we couldn't compute lags), then at
least we'd produce some kind of assignment. I.e., it's really just hitting the
"abort" button on the whole assignment. In contrast to the internal,
emergency-mode panic assignor, the StickyTaskAssignor is a regular, pluggable,
assignor that people could use.
We'll have to revisit this topic anyway before making TaskAssignor a public
API. Since it seems like both you and @cadonna view this change with suspicion,
I'll add a special case for the StickyTaskAssignor, preserving the behavior
before this PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]