[
https://issues.apache.org/jira/browse/KAFKA-18168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17921792#comment-17921792
]
Janindu Pathirana edited comment on KAFKA-18168 at 1/28/25 4:33 PM:
--------------------------------------------------------------------
Hi [~mjsax] ,
I could not commit to this issue for a few weeks, since I got really busy with
some University assignments. Really sorry for that.
Anyway, I was able to go through the issue and have 2 specific questions, I'll
direct them in 2 parts.
_+Question 1+_
I think, flushing and checkpointing during a successful execution of
reprocessState method or restoreState method in,
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java/#L178]
will checkpoint and save the state after the restoration process. I believe
this would resolve the issue encountered when *Scaling out* and {*}Instance
Restart{*}. Also, I did the code change and tested it. It did create the
checkpoint file and read from the proper offset during restarts. Would be great
if you can confirm if my approach is the correct one!
_+Question 2+_
As for *Lack of Traffic,* periodic restoration and checkpointing during
closing, I am unsure of the change that should be made. The reason being this
particular comment in the code,
{noformat}
when enforcing checkpoint is required, we should overwrite the checkpoint if it
is different from the old one; otherwise, we only overwrite the checkpoint if
it is largely different from the old one{noformat}
Ref -
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L78C35-L78C72]
So this basically says that we should checkpoint only if the difference is
large enough(10k events), if I'm not mistaken. Therefore, should we change the
existing approach of checkpointing and add periodic checkpointing or
checkpointing during closing?
was (Author: JIRAUSER307954):
Hi [~mjsax] ,
I could not commit to this issue for a few weeks, since I got really busy with
some University assignments. Really sorry for that.
Anyway, I was able to go through the issue and I think, flushing and
checkpointing during a successful execution of reprocessState method or
restoreState method in,
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java/#L178]
will checkpoint and save the state after the restoration process. I believe
this would resolve the issue encountered when *Scaling out* and {*}Instance
Restart{*}. Also, I did the code change and tested it. It did create the
checkpoint file and read from the proper offset during restarts. Would be great
if you can confirm if my approach is the correct one!
As for *Lack of Traffic,* periodic restoration and checkpointing during
closing, I am unsure of the change that should be made. The reason being this
particular comment in the code,
{noformat}
when enforcing checkpoint is required, we should overwrite the checkpoint if it
is different from the old one; otherwise, we only overwrite the checkpoint if
it is largely different from the old one{noformat}
Ref -
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L78C35-L78C72]
So this basically says that we should checkpoint only if the difference is
large enough(10k events), if I'm not mistaken. Therefore, should we change the
existing approach of checkpointing and add periodic checkpointing or
checkpointing during closing?
> GlobalKTable does not checkpoint restored offsets until next 10K events
> -----------------------------------------------------------------------
>
> Key: KAFKA-18168
> URL: https://issues.apache.org/jira/browse/KAFKA-18168
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 3.4.1, 3.8.1
> Reporter: Sergey Zyrianov
> Assignee: Janindu Pathirana
> Priority: Minor
>
> As in https://issues.apache.org/jira/browse/KAFKA-5241, there is a state of
> considerable size kept on a topic that backs up GlobalKTalbe. Restoring
> GlobalKTable takes minutes before it is operational. After successful restore
> the checkpoint file is not created until further 10K events happen on the
> topic.
> The following scenario illustrates the issue:
> # {*}Scaling Out{*}: When a new instance (e.g., pod X) is added to an
> already running set of instances (pods 0...X-1), the new instance will
> restore the state successfully. However, it will not create a checkpoint file
> until 10K events are processed on the {{GlobalKTable}} topic.
> # {*}Lack of Traffic{*}: If there is no new traffic on the {{GlobalKTable}}
> topic, there is no mechanism to force the creation of the checkpoint file.
> The state remains uncheckpointed. Ref
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L78C35-L78C72]
> # {*}Instance Restart{*}: If the new instance (pod X) is restarted (due to
> update for ex) before 10K events have been processed, it will have to restore
> the entire state from the topic again, leading to the same time-consuming
> restoration process. This issue persists across restarts.
> IMO, checkpointing during the restore process and upon completion/close is
> missing in the current implementation
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)