[ https://issues.apache.org/jira/browse/HUDI-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Danny Chen updated HUDI-8136: ----------------------------- Description: h2. Design The new design moves the instant time generation from being blocked on the checkpoint completion on the co-ordinator, to the writers cheaply obtaining the instant time to use before every file write by sending a light-weight request to the coordinator. The coordinator will maintain a mapping from checkpoint barriers(ids) to the instant times to be used by writers for file written during a checkpoint. When the writer sends a new instant time request, coordinator will return an existing pending instant or a new one(based on the comparison of the last finished checkpoint barrier from the writer and the existing barriers on the coordinator). We allow at most 2 pending instants on the coordinator. When an instant is committed, the pending mapping item from memory should be removed. Note that new instant time generation/switching on checkpoint start and instant time serving should happen in sequence (the same thread or a in-process lock) in the coordinator. The design assumes the following invariants.. * Once a writer task starts writing a file with time {{tx}} , it will not write any file with time {{ty}} , such that {{ty < tx}} * Checkpoint {{i}} will be started on the co-ordinator only after checkpoint {{j}} completes such that {{i < j}} Overall flow is as follows : # During startup co-ordinator generates a new instant {{tx}} and request it on the timeline. ## Happens within a process-local lock shared by instant time generation ## Any request for instant times from writer tasks, will serve {{tx}} # Writer tasks fetch an instant time to use for any file written, by issuing a call to co-ordinator ## writer tasks keep track of all files written by them between checkpoints. # When starting a checkpoint, co-ordinator takes the lock again and generates a new instant {{ty}} , request it on the timeline ## This ensure all instant time fetches for {{tx}} are first served, before checkpoint starts. i.e they will be reported back to the co-ordinator when the checkpoint completes. # When they receive the event to checkpoint, the writer tasks flush all open files, return the list of files written as a part of the checkpoint. # Once co-ordinator receives a list of all files from all writer tasks, it can include two types of files. ## files belonging to {{tx}} , which can now be comitted since we know no files with time {{tx}} could have been written between steps 3 & 5. ## files belonging to {{ty}} , which cannot be committed yet (there could be files still being written for {{ty}} ), but co-ordinator needs to ensure these files are ultimately included when {{ty}} is committed. > New instant time generation for Flink streaming pipeline > -------------------------------------------------------- > > Key: HUDI-8136 > URL: https://issues.apache.org/jira/browse/HUDI-8136 > Project: Apache Hudi > Issue Type: Improvement > Components: flink > Reporter: Danny Chen > Assignee: Danny Chen > Priority: Major > Fix For: 1.0.0 > > > h2. Design > The new design moves the instant time generation from being blocked on the > checkpoint completion on the co-ordinator, to the writers cheaply obtaining > the instant time to use before every file write by sending a light-weight > request to the coordinator. The coordinator will maintain a mapping from > checkpoint barriers(ids) to the instant times to be used by writers for file > written during a checkpoint. When the writer sends a new instant time > request, coordinator will return an existing pending instant or a new > one(based on the comparison of the last finished checkpoint barrier from the > writer and the existing barriers on the coordinator). We allow at most 2 > pending instants on the coordinator. When an instant is committed, the > pending mapping item from memory should be removed. Note that new instant > time generation/switching on checkpoint start and instant time serving should > happen in sequence (the same thread or a in-process lock) in the coordinator. > The design assumes the following invariants.. * Once a writer task starts > writing a file with time {{tx}} , it will not write any file with time {{ty}} > , such that {{ty < tx}} > * Checkpoint {{i}} will be started on the co-ordinator only after checkpoint > {{j}} completes such that {{i < j}} > > Overall flow is as follows : # During startup co-ordinator generates a new > instant {{tx}} and request it on the timeline. > ## Happens within a process-local lock shared by instant time generation > ## Any request for instant times from writer tasks, will serve {{tx}} > # Writer tasks fetch an instant time to use for any file written, by issuing > a call to co-ordinator > ## writer tasks keep track of all files written by them between checkpoints. > # When starting a checkpoint, co-ordinator takes the lock again and > generates a new instant {{ty}} , request it on the timeline > ## This ensure all instant time fetches for {{tx}} are first served, before > checkpoint starts. i.e they will be reported back to the co-ordinator when > the checkpoint completes. > # When they receive the event to checkpoint, the writer tasks flush all open > files, return the list of files written as a part of the checkpoint. > # Once co-ordinator receives a list of all files from all writer tasks, it > can include two types of files. > ## files belonging to {{tx}} , which can now be comitted since we know no > files with time {{tx}} could have been written between steps 3 & 5. > ## files belonging to {{ty}} , which cannot be committed yet (there could be > files still being written for {{ty}} ), but co-ordinator needs to ensure > these files are ultimately included when {{ty}} is committed. > -- This message was sent by Atlassian Jira (v8.20.10#820010)