Yuwei Xiao created HUDI-4521:
--------------------------------

             Summary: Fix lost re-commit in rare restart case (changing 
write.tasks)
                 Key: HUDI-4521
                 URL: https://issues.apache.org/jira/browse/HUDI-4521
             Project: Apache Hudi
          Issue Type: Bug
          Components: core, flink
            Reporter: Yuwei Xiao
            Assignee: Yuwei Xiao


The current `StreamWriteOperatorCoordinator` in Flink will try to re-commit the 
last batch during restart. And users may rely on this behavior to achieve 
exactly-once semantic when reading from source like Kafka.

 

However, the re-commit operation may be skipped if the user set a different 
`write.tasks` parameter in restart, because the current implementation keeps 
`write.tasks` number of slots to track events from subtasks and does not handle 
the case where the write parallelism changes.

 

For example:
 # Start with `write.tasks = 4` and the application crashes (or stops) right 
after the Flink checkpoint (e.g., CkpId=1) while before the hudi commit.
 # Restart with `write.tasks = 8` and the coordinator will receive 4 restored 
bootstrap metadata event and 4 empty bootstrap event. Since the arrival order 
of these events is not deterministic, so the coordinator may not re-commit the 
last commit.
 # The source (e.g., Kafka reader) use checkpointId to guide its consumption. 
So in the restart, it will read at the next offset given by `CkpId=1`. Then we 
will lost all data in hudi for the batch (i.e., Ckp=1).

Similar problem also happens for having smaller `write.tasks` during restart, 
e.g., 4 -> 2.

 

This Jira will fix the implementation to ensure the re-commit will be done for 
changing `write.tasks` case. Though the exactly-once semantic could be fixed by 
changing the reader side (e.g., track ckpId in hudi commit data and use it to 
guide the reader), it requires hudi users to change their application code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to