Navinder Brar created KAFKA-9169:
------------------------------------

             Summary: Standby Tasks point ask for incorrect offsets on resuming 
post suspension
                 Key: KAFKA-9169
                 URL: https://issues.apache.org/jira/browse/KAFKA-9169
             Project: Kafka
          Issue Type: New Feature
          Components: streams
            Reporter: Navinder Brar


In versions(check 2.0) where standby tasks are suspended on each rebalance the 
checkpoint file is updated post the flush and the expected behaviour is that 
post assignment the same standby task gets assigned back on the machine it will 
start reading data from changelog from the same offset from it left off. 

 

But there looks like a bug in the code, every time post rebalance it starts 
reading from the offset from where it read the first time the task was assigned 
on this machine. This has 2 repercussions:
 # After every rebalance the standby tasks start restoring huge amount of data 
which they have already restored earlier(Verified this via 300x increase 
Network IO on all streams instances post rebalance even when no change in 
assignment) .
 # If changelog has time retention those offsets will not be available in the 
changelog, which leads to offsetOutOfRange exceptions and the stores get 
deleted and recreated again.

 

I have gone through the code and I think I know the issue.

In TaskManager# updateNewAndRestoringTasks(), the function 
assignStandbyPartitions() gets called for all the running standby tasks where 
it populates the Map: checkpointedOffsets from the 
standbyTask.checkpointedOffsets() which is only updated at the time of 
initialization of a StandbyTask(i.e. in it's constructor). 

 

This has an easy fix.

Post resumption we are reading standbyTask.checkpointedOffsets() to know the 
offset from where the standby task should start running and not from 
stateMgr.checkpointed() which gets updated on every commit to the checkpoint 
file. In the former case it's always reading from the same offset, even those 
which it had already read earlier and in cases where changelog topic has a 
retention time, it gives offsetOutOfRange exception. So, 
standbyTask.checkpointedOffsets() is quite useless and we should use 
stateMgr.checkpointed() instead to return offsets to task manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to