vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416223395



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> 
allSourceTopics,
             allTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor;
-        if (highAvailabilityEnabled) {
-            if (lagComputationSuccessful) {
-                taskAssignor = new HighAvailabilityTaskAssignor(
-                    clientStates,
-                    allTasks,
-                    statefulTasks,
-                    assignmentConfigs);
-            } else {
-                log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
-                             + "trigger another rebalance to retry.");
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-                taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-            }
+        if (!lagComputationSuccessful) {

Review comment:
       Thanks @ableegoldman ,
   
   I thought about the marker interface also, but didn't mention it because it 
seems like a bad sign if we have two implementations and two interfaces from 
the very start.
   
   I think I'm getting a clue as to the contention when you mention 
"supplemental information". From my perspective, the TaskAssignor interface 
takes as input a "ClientState" for each instance in the cluster, which 
represents the current state of the cluster. One of the things it tells you is 
the lag for each task on the instance. How is this supplimental? It seems to be 
just one of the properties of the object. It actually happens to have a JavaDoc:
   
   ```java
       /**
        * Returns the total lag across all logged stores in the task. Equal to 
the end offset sum if this client
        * did not have any state for this task on disk.
        *
        * @return end offset sum - offset sum
        *          Task.LATEST_OFFSET if this was previously an active running 
task on this client
        */
       long lagFor(final TaskId task)
   ```
   
   This is a private interface, so we can change this definition if it's not 
accurate. However, I'd be concerned about trying to program against the 
TaskAssignor interface if the caveat is that some of the arguments might be 
wrong or missing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to