[ 
https://issues.apache.org/jira/browse/HUDI-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated HUDI-8136:
-----------------------------
    Description: 
h2. Design
The new design moves the instant time generation from being blocked on the 
checkpoint completion on the co-ordinator, to the writers cheaply obtaining the 
instant time to use before every file write by sending a light-weight request 
to the coordinator. The coordinator will maintain a mapping from checkpoint 
barriers(ids) to the instant times to be used by writers for file written 
during a checkpoint. When the writer sends a new instant time request, 
coordinator will return an existing pending instant or a new one(based on the 
comparison of the last finished checkpoint barrier from the writer and the 
existing barriers on the coordinator). We allow at most 2 pending instants on 
the coordinator. When an instant is committed, the pending mapping item from 
memory should be removed. Note that new instant time generation/switching on 
checkpoint start and instant time serving should happen in sequence (the same 
thread or a in-process lock) in the coordinator.
The design assumes the following invariants.. * Once a writer task starts 
writing a file with time {{tx}} , it will not write any file with time {{ty}} , 
such that {{ty < tx}}
 * Checkpoint {{i}} will be started on the co-ordinator only after checkpoint 
{{j}} completes such that {{i < j}}

 
Overall flow is as follows : # During startup co-ordinator generates a new 
instant {{tx}} and request it on the timeline.
 ## Happens within a process-local lock shared by instant time generation
 ## Any request for instant times from writer tasks, will serve {{tx}}
 # Writer tasks fetch an instant time to use for any file written, by issuing a 
call to co-ordinator
 ## writer tasks keep track of all files written by them between checkpoints.
 # When starting a checkpoint, co-ordinator takes the lock again and generates 
a new instant {{ty}} , request it on the timeline
 ## This ensure all instant time fetches for {{tx}} are first served, before 
checkpoint starts. i.e they will be reported back to the co-ordinator when the 
checkpoint completes.
 # When they receive the event to checkpoint, the writer tasks flush all open 
files, return the list of files written as a part of the checkpoint.
 # Once co-ordinator receives a list of all files from all writer tasks, it can 
include two types of files.
 ## files belonging to {{tx}} , which can now be comitted since we know no 
files with time {{tx}} could have been written between steps 3 & 5.
 ## files belonging to {{ty}} , which cannot be committed yet (there could be 
files still being written for {{ty}} ), but co-ordinator needs to ensure these 
files are ultimately included when {{ty}} is committed. 

 

> New instant time generation for Flink streaming pipeline
> --------------------------------------------------------
>
>                 Key: HUDI-8136
>                 URL: https://issues.apache.org/jira/browse/HUDI-8136
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: flink
>            Reporter: Danny Chen
>            Assignee: Danny Chen
>            Priority: Major
>             Fix For: 1.0.0
>
>
> h2. Design
> The new design moves the instant time generation from being blocked on the 
> checkpoint completion on the co-ordinator, to the writers cheaply obtaining 
> the instant time to use before every file write by sending a light-weight 
> request to the coordinator. The coordinator will maintain a mapping from 
> checkpoint barriers(ids) to the instant times to be used by writers for file 
> written during a checkpoint. When the writer sends a new instant time 
> request, coordinator will return an existing pending instant or a new 
> one(based on the comparison of the last finished checkpoint barrier from the 
> writer and the existing barriers on the coordinator). We allow at most 2 
> pending instants on the coordinator. When an instant is committed, the 
> pending mapping item from memory should be removed. Note that new instant 
> time generation/switching on checkpoint start and instant time serving should 
> happen in sequence (the same thread or a in-process lock) in the coordinator.
> The design assumes the following invariants.. * Once a writer task starts 
> writing a file with time {{tx}} , it will not write any file with time {{ty}} 
> , such that {{ty < tx}}
>  * Checkpoint {{i}} will be started on the co-ordinator only after checkpoint 
> {{j}} completes such that {{i < j}}
>  
> Overall flow is as follows : # During startup co-ordinator generates a new 
> instant {{tx}} and request it on the timeline.
>  ## Happens within a process-local lock shared by instant time generation
>  ## Any request for instant times from writer tasks, will serve {{tx}}
>  # Writer tasks fetch an instant time to use for any file written, by issuing 
> a call to co-ordinator
>  ## writer tasks keep track of all files written by them between checkpoints.
>  # When starting a checkpoint, co-ordinator takes the lock again and 
> generates a new instant {{ty}} , request it on the timeline
>  ## This ensure all instant time fetches for {{tx}} are first served, before 
> checkpoint starts. i.e they will be reported back to the co-ordinator when 
> the checkpoint completes.
>  # When they receive the event to checkpoint, the writer tasks flush all open 
> files, return the list of files written as a part of the checkpoint.
>  # Once co-ordinator receives a list of all files from all writer tasks, it 
> can include two types of files.
>  ## files belonging to {{tx}} , which can now be comitted since we know no 
> files with time {{tx}} could have been written between steps 3 & 5.
>  ## files belonging to {{ty}} , which cannot be committed yet (there could be 
> files still being written for {{ty}} ), but co-ordinator needs to ensure 
> these files are ultimately included when {{ty}} is committed. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to