[ 
https://issues.apache.org/jira/browse/HUDI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuwei Xiao updated HUDI-4521:
-----------------------------
    Description: 
The current _*StreamWriteOperatorCoordinator*_ in Flink will try to re-commit 
the last batch during restart. And users may rely on this behavior to achieve 
exactly-once semantic when reading from source like Kafka.

 

However, the re-commit operation may be skipped if the user set a different 
_write.tasks_ parameter in restart, because the current implementation keeps 
_write.tasks_ number of slots to track events from subtasks and does not handle 
the case where the write parallelism changes.

 

For example:
 # Start with _write.tasks = 4_ and the application crashes (or stops) right 
after the Flink checkpoint (e.g., CkpId=1) while before the hudi commit.
 # Restart with _write.tasks = 8_ and the coordinator will receive 4 restored 
bootstrap metadata event and 4 empty bootstrap event. Since the arrival order 
of these events is not deterministic, so the coordinator may not re-commit the 
last commit.
 # The source (e.g., Kafka reader) use checkpointId to guide its consumption. 
So in the restart, it will read at the next offset given by {_}CkpId=1{_}. Then 
we will lost all data in hudi for the batch (i.e., Ckp=1).

Similar problem also happens for having smaller _write.tasks_ during restart, 
e.g., 4 -> 2.

 

This Jira will fix the implementation to ensure the re-commit will be done for 
changing _write.tasks_ case. Though the exactly-once semantic could be fixed by 
changing the reader side (e.g., track ckpId in hudi commit data and use it to 
guide the reader), it requires hudi users to change their application code.

  was:
The current `StreamWriteOperatorCoordinator` in Flink will try to re-commit the 
last batch during restart. And users may rely on this behavior to achieve 
exactly-once semantic when reading from source like Kafka.

 

However, the re-commit operation may be skipped if the user set a different 
`write.tasks` parameter in restart, because the current implementation keeps 
`write.tasks` number of slots to track events from subtasks and does not handle 
the case where the write parallelism changes.

 

For example:
 # Start with `write.tasks = 4` and the application crashes (or stops) right 
after the Flink checkpoint (e.g., CkpId=1) while before the hudi commit.
 # Restart with `write.tasks = 8` and the coordinator will receive 4 restored 
bootstrap metadata event and 4 empty bootstrap event. Since the arrival order 
of these events is not deterministic, so the coordinator may not re-commit the 
last commit.
 # The source (e.g., Kafka reader) use checkpointId to guide its consumption. 
So in the restart, it will read at the next offset given by `CkpId=1`. Then we 
will lost all data in hudi for the batch (i.e., Ckp=1).

Similar problem also happens for having smaller `write.tasks` during restart, 
e.g., 4 -> 2.

 

This Jira will fix the implementation to ensure the re-commit will be done for 
changing `write.tasks` case. Though the exactly-once semantic could be fixed by 
changing the reader side (e.g., track ckpId in hudi commit data and use it to 
guide the reader), it requires hudi users to change their application code.


> Fix lost re-commit in rare restart case (changing write.tasks)
> --------------------------------------------------------------
>
>                 Key: HUDI-4521
>                 URL: https://issues.apache.org/jira/browse/HUDI-4521
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: core, flink
>            Reporter: Yuwei Xiao
>            Assignee: Yuwei Xiao
>            Priority: Major
>
> The current _*StreamWriteOperatorCoordinator*_ in Flink will try to re-commit 
> the last batch during restart. And users may rely on this behavior to achieve 
> exactly-once semantic when reading from source like Kafka.
>  
> However, the re-commit operation may be skipped if the user set a different 
> _write.tasks_ parameter in restart, because the current implementation keeps 
> _write.tasks_ number of slots to track events from subtasks and does not 
> handle the case where the write parallelism changes.
>  
> For example:
>  # Start with _write.tasks = 4_ and the application crashes (or stops) right 
> after the Flink checkpoint (e.g., CkpId=1) while before the hudi commit.
>  # Restart with _write.tasks = 8_ and the coordinator will receive 4 restored 
> bootstrap metadata event and 4 empty bootstrap event. Since the arrival order 
> of these events is not deterministic, so the coordinator may not re-commit 
> the last commit.
>  # The source (e.g., Kafka reader) use checkpointId to guide its consumption. 
> So in the restart, it will read at the next offset given by {_}CkpId=1{_}. 
> Then we will lost all data in hudi for the batch (i.e., Ckp=1).
> Similar problem also happens for having smaller _write.tasks_ during restart, 
> e.g., 4 -> 2.
>  
> This Jira will fix the implementation to ensure the re-commit will be done 
> for changing _write.tasks_ case. Though the exactly-once semantic could be 
> fixed by changing the reader side (e.g., track ckpId in hudi commit data and 
> use it to guide the reader), it requires hudi users to change their 
> application code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to