ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r482678606



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final 
Map<TaskId, Long> allTaskEndO
             final Long endOffsetSum = taskEntry.getValue();
             final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-            if (endOffsetSum < offsetSum) {
+            if (offsetSum == Task.LATEST_OFFSET) {
+                taskLagTotals.put(task, Task.LATEST_OFFSET);
+            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+            } else if (endOffsetSum < offsetSum) {
                 LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum 
+ " smaller than offsetSum=" +
                              offsetSum + " on member " + uuid + ". This 
probably means the task is corrupted," +
                              " which in turn indicates that it will need to 
restore from scratch if it gets assigned." +
                              " The assignor will de-prioritize returning this 
task to this member in the hopes that" +
                              " some other member may be able to re-use its 
state.");
                 taskLagTotals.put(task, endOffsetSum);

Review comment:
       Noticed that we were logging this incorrectly if either of the negative 
sentinels came up




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to