ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r482678606
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final
Map<TaskId, Long> allTaskEndO
final Long endOffsetSum = taskEntry.getValue();
final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
- if (endOffsetSum < offsetSum) {
+ if (offsetSum == Task.LATEST_OFFSET) {
+ taskLagTotals.put(task, Task.LATEST_OFFSET);
+ } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+ taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+ } else if (endOffsetSum < offsetSum) {
LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum
+ " smaller than offsetSum=" +
offsetSum + " on member " + uuid + ". This
probably means the task is corrupted," +
" which in turn indicates that it will need to
restore from scratch if it gets assigned." +
" The assignor will de-prioritize returning this
task to this member in the hopes that" +
" some other member may be able to re-use its
state.");
taskLagTotals.put(task, endOffsetSum);
Review comment:
Noticed that we were logging this incorrectly if either of the negative
sentinels came up
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]