[ 
https://issues.apache.org/jira/browse/KAFKA-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16975693#comment-16975693
 ] 

Navinder Brar commented on KAFKA-9169:
--------------------------------------

I can confirm that this has been there since 1.1 version.

> Standby Tasks point ask for incorrect offsets on resuming post suspension
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-9169
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9169
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Navinder Brar
>            Assignee: John Roesler
>            Priority: Critical
>             Fix For: 2.5.0
>
>
> In versions(check 2.0) where standby tasks are suspended on each rebalance 
> the checkpoint file is updated post the flush and the expected behaviour is 
> that post assignment the same standby task gets assigned back on the machine 
> it will start reading data from changelog from the same offset from it left 
> off. 
>  
> But there looks like a bug in the code, every time post rebalance it starts 
> reading from the offset from where it read the first time the task was 
> assigned on this machine. This has 2 repercussions:
>  # After every rebalance the standby tasks start restoring huge amount of 
> data which they have already restored earlier(Verified this via 300x increase 
> Network IO on all streams instances post rebalance even when no change in 
> assignment) .
>  # If changelog has time retention those offsets will not be available in the 
> changelog, which leads to offsetOutOfRange exceptions and the stores get 
> deleted and recreated again.
>  
> I have gone through the code and I think I know the issue.
> In TaskManager# updateNewAndRestoringTasks(), the function 
> assignStandbyPartitions() gets called for all the running standby tasks where 
> it populates the Map: checkpointedOffsets from the 
> standbyTask.checkpointedOffsets() which is only updated at the time of 
> initialization of a StandbyTask(i.e. in it's constructor). 
>  
> This has an easy fix.
> Post resumption we are reading standbyTask.checkpointedOffsets() to know the 
> offset from where the standby task should start running and not from 
> stateMgr.checkpointed() which gets updated on every commit to the checkpoint 
> file. In the former case it's always reading from the same offset, even those 
> which it had already read earlier and in cases where changelog topic has a 
> retention time, it gives offsetOutOfRange exception. So, 
> standbyTask.checkpointedOffsets() is quite useless and we should use 
> stateMgr.checkpointed() instead to return offsets to task manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to