[ https://issues.apache.org/jira/browse/HUDI-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630183#comment-17630183 ]
KevinyhZou edited comment on HUDI-5159 at 11/8/22 5:17 AM: ----------------------------------------------------------- OK, I have made a implement of this feature, and will submit a pr in recently days. You can help to review or see what else need to be added. [~complone] was (Author: zouyunhe): OK, I have made a implement of this feature, and will submit a pr in recently days. You can help see what else need to be added. [~complone] > Support write a success file to partition when it finished in flink streaming > append writer > ------------------------------------------------------------------------------------------- > > Key: HUDI-5159 > URL: https://issues.apache.org/jira/browse/HUDI-5159 > Project: Apache Hudi > Issue Type: New Feature > Components: connectors > Reporter: KevinyhZou > Priority: Major > > When we use flink streaming job to consume data from mq and wtite to hudi > partition table, we can not know when a partition is wite finished. And this > is often necessary to tell the downstream offline task scheduler to run while > partition is finished. > I think we can use the flink watermark mechanism to implment this. As > watermark represents the minimum timestamp in flink streaming job, when the > watermark is greater than the hudi partition time, it always means the data > is write finished to hudi parititon in an ordered streaming data, and then > it is the time to write a success file to the parititon path to represent it > finished wirte. > It can be designed as below. > # Get the field of partitions and values in flink append streaming data, > this can be implements in AppendWriteFunction, the emit it to downstream; > # Implement a SuccessFileWriteSink to receive these partition values and > store them to activePartitions set, > # Compare the watermark timestamp and the partition timestamp values > converted from activeParitions set, if the wartermark is greater, set the > partition to finished partitons set; > # Iterate the finished partition set, and get the partition path, and write > success file to it while flink job make checkpoint; > # Store the active partition set and finished partition set in flink state, > avoid the data loss while the job failver. -- This message was sent by Atlassian Jira (v8.20.10#820010)