ableegoldman commented on a change in pull request #8892: URL: https://github.com/apache/kafka/pull/8892#discussion_r482678606
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ########## @@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO final Long endOffsetSum = taskEntry.getValue(); final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L); - if (endOffsetSum < offsetSum) { + if (offsetSum == Task.LATEST_OFFSET) { + taskLagTotals.put(task, Task.LATEST_OFFSET); + } else if (offsetSum == UNKNOWN_OFFSET_SUM) { + taskLagTotals.put(task, UNKNOWN_OFFSET_SUM); + } else if (endOffsetSum < offsetSum) { LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" + offsetSum + " on member " + uuid + ". This probably means the task is corrupted," + " which in turn indicates that it will need to restore from scratch if it gets assigned." + " The assignor will de-prioritize returning this task to this member in the hopes that" + " some other member may be able to re-use its state."); taskLagTotals.put(task, endOffsetSum); Review comment: Noticed that we were logging this incorrectly if either of the negative sentinels came up ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org