vinothchandar commented on code in PR #7907: URL: https://github.com/apache/hudi/pull/7907#discussion_r1361412028
########## rfc/rfc-66/rfc-66.md: ########## @@ -0,0 +1,318 @@ +# RFC-66: Non-blocking Concurrency Control + +## Proposers +- @danny0405 +- @ForwardXu + +## Approvers +- + +## Status + +JIRA: [Lockless multi writer support](https://issues.apache.org/jira/browse/HUDI-5672) + +## Abstract +As you know, Hudi already supports basic OCC with abundant lock providers. +But for multi streaming ingestion writers, the OCC does not work well because the conflicts happen in very high frequency. +Expand it a little bit, with hashing index, all the writers have deterministic hashing algorithm for distributing the records by primary keys, +all the keys are evenly distributed in all the data buckets, for a single data flushing in one writer, almost all the data buckets are appended with new inputs, +so the conflict would very possibility happen for mul-writer because almost all the data buckets are being written by multiple writers at the same time; +For bloom filter index, things are different, but remember that we have a small file load rebalance strategy to writer into the **small** bucket in higher priority, +that means, multiple writers prune to write into the same **small** buckets at the same time, that's how conflicts happen. + +In general, for multiple streaming writers ingestion, OCC is not very feasible in production, in this RFC, we propose a non-blocking solution for streaming ingestion. + +## Background + +Streaming jobs are naturally suitable for data ingestion, it has no complexity of pipeline orchestration and has a smother write workload. +Most of the raw data set we are handling today are generating all the time in streaming way. + +Based on that, many requests for multiple writers' ingestion are derived. With multi-writer ingestion, several streaming events with the same schema can be drained into one Hudi table, +the Hudi table kind of becomes a UNION table view for all the input data set. This is a very common use case because in reality, the data sets are usually scattered all over the data sources. + +Another very useful use case we wanna unlock is the real-time data set join. One of the biggest pain point in streaming computation is the dataset join, +the engine like Flink has basic supports for all kind of SQL JOINs, but it stores the input records within its inner state-backend which is a huge cost for pure data join with no additional computations. +In [HUDI-3304](https://issues.apache.org/jira/browse/HUDI-3304), we introduced a `PartialUpdateAvroPayload`, in combination with the lockless multi-writer, +we can implement N-ways data sources join in real-time! Hudi would take care of the payload join during compaction service procedure. + +## Design + +### The Precondition + +#### MOR Table Type Is Required + +The table type must be `MERGE_ON_READ`, so that we can defer the conflict resolution to the compaction phase. The compaction service would resolve the conflicts of the same keys by respecting the event time sequence of the events. + +#### Deterministic Bucketing Strategy + +Deterministic bucketing strategy is required, because the same records keys from different writers are desired to be distributed into the same bucket, not only for UPSERTs, but also for all the new INSERTs. + +#### Lazy Cleaning Strategy + +Config the cleaning strategy as lazy so that the pending instants are not rolled back by the other active writers. + +### Basic Work Flow + +#### Writing Log Files Separately In Sequence + +Basically, each writer flushes the log files in sequence, the log file rolls over for different versioning number, +a pivotal thing needs to note here is that we need to make the write_token unique for the same version log files with the same base instant time, +so that the file name does not conflict for the writers. + +The log files generated by a single writer can still preserve the sequence by versioning number, which is important if the natual order is needed for single writer events. + +![multi-writer](multi_writer.png) + +### The Compaction Procedure + +The compaction service is the duty role that actually resoves the conflicts. Within a file group, it sorts the files then merge all the record payloads for a record key. +The event time sequence is respected by combining the payloads with even time field provided by the payload (known as the `preCombine` field in Hudi). + +![compaction procedure](compaction.png) + +#### Non-Serial Compaction Plan Schedule +Currently, the compaction plan scheduling must be in serial order with the writers, that means, while scheduling the compaction plan, no ongoing writers should be writing to +the table. This restriction makes the compaction almost impossible for multi streaming writers because there is always an instant writing to the table for streaming ingestion. + +In order to unblock the compaction plan scheduling and keep the completeness of the readers, we introduce the completion time for file slice generation: + +- Support quick look-up from instant time ➝ completion time, the [HUDI-6539](https://issues.apache.org/jira/browse/HUDI-6539) supports fast completion time queries on archived timeline, based on this, we are able +to support flexible completion time queries on both active and archived timeline, see [HUDI-6725](https://issues.apache.org/jira/browse/HUDI-6725); +- New compaction plan scheduling to comply with the completion time, that is: only log files that have smaller completion time(than the compaction start instant time) should be considered +- New file slice generation strategy, a log file with smaller instant time (than the compaction instant time) but greater completion time should be assigned to a new file slice +- By combining #2 and #3, in general, we are slicing by comparing the compaction start time with the log files completion time. + +<img src="non_serial_compaction.png" alt="drawing" width="400"/> + +Assumes we have two log files, with instant time & completion time as [t1, t3] and [t2, t5], at t4, a compaction plan was scheduled, +the plan does not include file t2_t5_v1.w2, in the reader view, the log file should be assigned to a different file slice than the t4 instant time. + +#### Global Monotonically Increasing Timestamp + +In order to make the time deterministic among cloud storages, we use a logical time generated by a special **TimeGenerator**, see [HUDI-1623](https://issues.apache.org/jira/browse/HUDI-1623) for details. + +#### Assumption On Locks +In order to get global monotonically increasing time generation, we may introduce lock providers on the instant time generation and the creation of completed metadata file. +That means, for each instant time/completion time generation request, there is a try-lock action. As a special case, we store the completion time as part of the completed +metadata file name, the time generation and file creation should have atomicity altogether, they should be guarded under the same lock of regular time generation request. + +### The Log File Naming Convention + +We use the current instant time instead of the base commit time in the file name, +so that tasks from different writers can not conflict in file name. We can also parse the file name to fetch the instant time quickly. +Finally, the log name is with the following pattern: + +```shell +${uuid}_${delta_commit instant time}.log.${version}_${task_token} +``` + +### The Sorting Rules for Log Files from Different Writers + +The sorting rules is important because it somehow decides the natual order processing, +especially when the event time field are the same, and we don't know which payload to choose when combining. +Here we can keep using the log file name compactor of current codebase, that is: + +```sql +order by delta_commit_time, version_number, write_token +``` + +For a single writer, its log files natual order is preserved by the auto increasing version_number, +for multiple writers, we try the best to preserve the natual order still with the version number, +but the write_token has deterministic priority in sorting, which breaks the file generation order. + +![log file sequence](log_file_sequence.png) + +### Format changes + +| Type | Changes | +|----------------------------|------------------------------------------------------------------------------------------------------------------------------------| +| Commit/compaction metadata | No changes | +| Commit file name | we are only going to add completion time in completed metadata filename for all actions | +| Log file format | Adding deltacommit instant time to log file name. The file name does not contain base instant time; No changes to log block format | + +### Commit Protocol + +| Type | Content | +|---------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------| +| Writer expectations | Writer does not need to look up the latest file slice to fetch the base commit time anymore, it always uses the current instant time directly | +| Conflict Resolution | No explicit conflict resolution, it is handled by the compactor | +| Reader expectations | Readers still query based on file slices, see the pseudocode for new file slicing | +| Synchronization and correctness | Still need some mutex on commit log (timeline). Getting the completion time and adding to the timeline needs to happen atomically | + +**Note:** Clustering and updates conflict resolution is not in the scope of non-blocking concurrency control, you still need the OCC for course-grained failures and retries. + +## Appendix + +### Pseudocode for New File Slicing + +Assume we have the following inputs to build the file slices: +1. `base_files_by_file_id`: a map of filegroup id to list of base files. +2. `log_files_by_file_id`: a map of feilgroup id to list of log files. +3. `timeline`: Hudi write timeline . + +The pseudocode below introduces a notion of **file slice barriers**, which contain a list of instant (start) times of +the base files in descending order. Barriers will help in demarcating file slices. The below pseudocode builds the file +slices per file group. Building file groups from file slices is not shown but can be easily done. + +```python +# new file slicing +def build_file_slices(base_files_by_file_id, log_files_by_file_id, timeline): + # get set of all filegroup ids + file_id_set = base_files_by_file_id.keys + file_id_set.add_all(log_files_by_file_id.keys) + + for file_id in file_id_set: + # sort the base files by descending order of instant (start) time, i.e. last written base file first + base_files_in_file_id = base_files_by_file_id[file_id].sort(BASE_FILE_REVERSE_COMPARATOR) + # filter out log files that have been log-compacted + log_files_in_file_id = handle_log_compaction(log_files_by_file_id[file_id], timeline) + # sort the log files by ascending order of completion time + log_files_in_file_id.sort(LOG_FILE_COMPARATOR) + # get list of file slice barriers for this fielgroup id + file_slice_barriers = get_file_slice_barriers(base_files_in_file_id, log_files_in_file_id) + # build file slices + file_slices = [] + for log_file in log_files_in_file_id: + file_slice = find_file_slice(log_file, file_slice_barriers) + file_slices.add(file_slice) + + +# Given all log files for a file id, filter out such log files that have been log-compacted. +def handle_log_compaction(log_files_in_file_id, timeline): + log_compaction_instants = timeline.get_completed_log_compaction() + for log_compaction_instant in log_compaction_instant: + log_files_compacted = get_log_files_compacted(log_compaction_instant) + log_files_in_file_id.remove(log_files_compacted) + + return log_files_in_file_id + + +# Given base files and log files for a filegroup id, return a list containing file slice barriers. +def get_file_slice_barriers(base_files_in_file_id, log_files_in_file_id): + file_slice_barriers = [] + if base_files_in_file_id.size > 0: + for base_file in base_files_in_file_id: + file_slice_barriers.add(instant_time(base_file)) Review Comment: this is so that we can associate any log file with completion time < instant_time of `base_file` with the prev file slice. ########## rfc/rfc-66/rfc-66.md: ########## @@ -0,0 +1,318 @@ +# RFC-66: Non-blocking Concurrency Control + +## Proposers +- @danny0405 +- @ForwardXu + +## Approvers +- + +## Status + +JIRA: [Lockless multi writer support](https://issues.apache.org/jira/browse/HUDI-5672) + +## Abstract +As you know, Hudi already supports basic OCC with abundant lock providers. +But for multi streaming ingestion writers, the OCC does not work well because the conflicts happen in very high frequency. +Expand it a little bit, with hashing index, all the writers have deterministic hashing algorithm for distributing the records by primary keys, +all the keys are evenly distributed in all the data buckets, for a single data flushing in one writer, almost all the data buckets are appended with new inputs, +so the conflict would very possibility happen for mul-writer because almost all the data buckets are being written by multiple writers at the same time; +For bloom filter index, things are different, but remember that we have a small file load rebalance strategy to writer into the **small** bucket in higher priority, +that means, multiple writers prune to write into the same **small** buckets at the same time, that's how conflicts happen. + +In general, for multiple streaming writers ingestion, OCC is not very feasible in production, in this RFC, we propose a non-blocking solution for streaming ingestion. + +## Background + +Streaming jobs are naturally suitable for data ingestion, it has no complexity of pipeline orchestration and has a smother write workload. +Most of the raw data set we are handling today are generating all the time in streaming way. + +Based on that, many requests for multiple writers' ingestion are derived. With multi-writer ingestion, several streaming events with the same schema can be drained into one Hudi table, +the Hudi table kind of becomes a UNION table view for all the input data set. This is a very common use case because in reality, the data sets are usually scattered all over the data sources. + +Another very useful use case we wanna unlock is the real-time data set join. One of the biggest pain point in streaming computation is the dataset join, +the engine like Flink has basic supports for all kind of SQL JOINs, but it stores the input records within its inner state-backend which is a huge cost for pure data join with no additional computations. +In [HUDI-3304](https://issues.apache.org/jira/browse/HUDI-3304), we introduced a `PartialUpdateAvroPayload`, in combination with the lockless multi-writer, +we can implement N-ways data sources join in real-time! Hudi would take care of the payload join during compaction service procedure. + +## Design + +### The Precondition + +#### MOR Table Type Is Required + +The table type must be `MERGE_ON_READ`, so that we can defer the conflict resolution to the compaction phase. The compaction service would resolve the conflicts of the same keys by respecting the event time sequence of the events. + +#### Deterministic Bucketing Strategy + +Deterministic bucketing strategy is required, because the same records keys from different writers are desired to be distributed into the same bucket, not only for UPSERTs, but also for all the new INSERTs. + +#### Lazy Cleaning Strategy + +Config the cleaning strategy as lazy so that the pending instants are not rolled back by the other active writers. + +### Basic Work Flow + +#### Writing Log Files Separately In Sequence + +Basically, each writer flushes the log files in sequence, the log file rolls over for different versioning number, +a pivotal thing needs to note here is that we need to make the write_token unique for the same version log files with the same base instant time, +so that the file name does not conflict for the writers. + +The log files generated by a single writer can still preserve the sequence by versioning number, which is important if the natual order is needed for single writer events. + +![multi-writer](multi_writer.png) + +### The Compaction Procedure + +The compaction service is the duty role that actually resoves the conflicts. Within a file group, it sorts the files then merge all the record payloads for a record key. +The event time sequence is respected by combining the payloads with even time field provided by the payload (known as the `preCombine` field in Hudi). + +![compaction procedure](compaction.png) + +#### Non-Serial Compaction Plan Schedule +Currently, the compaction plan scheduling must be in serial order with the writers, that means, while scheduling the compaction plan, no ongoing writers should be writing to +the table. This restriction makes the compaction almost impossible for multi streaming writers because there is always an instant writing to the table for streaming ingestion. + +In order to unblock the compaction plan scheduling and keep the completeness of the readers, we introduce the completion time for file slice generation: + +- Support quick look-up from instant time ➝ completion time, the [HUDI-6539](https://issues.apache.org/jira/browse/HUDI-6539) supports fast completion time queries on archived timeline, based on this, we are able +to support flexible completion time queries on both active and archived timeline, see [HUDI-6725](https://issues.apache.org/jira/browse/HUDI-6725); +- New compaction plan scheduling to comply with the completion time, that is: only log files that have smaller completion time(than the compaction start instant time) should be considered +- New file slice generation strategy, a log file with smaller instant time (than the compaction instant time) but greater completion time should be assigned to a new file slice +- By combining #2 and #3, in general, we are slicing by comparing the compaction start time with the log files completion time. + +<img src="non_serial_compaction.png" alt="drawing" width="400"/> + +Assumes we have two log files, with instant time & completion time as [t1, t3] and [t2, t5], at t4, a compaction plan was scheduled, +the plan does not include file t2_t5_v1.w2, in the reader view, the log file should be assigned to a different file slice than the t4 instant time. + +#### Global Monotonically Increasing Timestamp + +In order to make the time deterministic among cloud storages, we use a logical time generated by a special **TimeGenerator**, see [HUDI-1623](https://issues.apache.org/jira/browse/HUDI-1623) for details. + +#### Assumption On Locks +In order to get global monotonically increasing time generation, we may introduce lock providers on the instant time generation and the creation of completed metadata file. +That means, for each instant time/completion time generation request, there is a try-lock action. As a special case, we store the completion time as part of the completed +metadata file name, the time generation and file creation should have atomicity altogether, they should be guarded under the same lock of regular time generation request. + +### The Log File Naming Convention + +We use the current instant time instead of the base commit time in the file name, +so that tasks from different writers can not conflict in file name. We can also parse the file name to fetch the instant time quickly. +Finally, the log name is with the following pattern: + +```shell +${uuid}_${delta_commit instant time}.log.${version}_${task_token} +``` + +### The Sorting Rules for Log Files from Different Writers + +The sorting rules is important because it somehow decides the natual order processing, +especially when the event time field are the same, and we don't know which payload to choose when combining. +Here we can keep using the log file name compactor of current codebase, that is: + +```sql +order by delta_commit_time, version_number, write_token +``` + +For a single writer, its log files natual order is preserved by the auto increasing version_number, +for multiple writers, we try the best to preserve the natual order still with the version number, +but the write_token has deterministic priority in sorting, which breaks the file generation order. + +![log file sequence](log_file_sequence.png) + +### Format changes + +| Type | Changes | +|----------------------------|------------------------------------------------------------------------------------------------------------------------------------| +| Commit/compaction metadata | No changes | +| Commit file name | we are only going to add completion time in completed metadata filename for all actions | +| Log file format | Adding deltacommit instant time to log file name. The file name does not contain base instant time; No changes to log block format | + +### Commit Protocol + +| Type | Content | +|---------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------| +| Writer expectations | Writer does not need to look up the latest file slice to fetch the base commit time anymore, it always uses the current instant time directly | +| Conflict Resolution | No explicit conflict resolution, it is handled by the compactor | +| Reader expectations | Readers still query based on file slices, see the pseudocode for new file slicing | +| Synchronization and correctness | Still need some mutex on commit log (timeline). Getting the completion time and adding to the timeline needs to happen atomically | + +**Note:** Clustering and updates conflict resolution is not in the scope of non-blocking concurrency control, you still need the OCC for course-grained failures and retries. + +## Appendix + +### Pseudocode for New File Slicing + +Assume we have the following inputs to build the file slices: +1. `base_files_by_file_id`: a map of filegroup id to list of base files. +2. `log_files_by_file_id`: a map of feilgroup id to list of log files. +3. `timeline`: Hudi write timeline . + +The pseudocode below introduces a notion of **file slice barriers**, which contain a list of instant (start) times of +the base files in descending order. Barriers will help in demarcating file slices. The below pseudocode builds the file +slices per file group. Building file groups from file slices is not shown but can be easily done. + +```python +# new file slicing +def build_file_slices(base_files_by_file_id, log_files_by_file_id, timeline): + # get set of all filegroup ids + file_id_set = base_files_by_file_id.keys + file_id_set.add_all(log_files_by_file_id.keys) + + for file_id in file_id_set: + # sort the base files by descending order of instant (start) time, i.e. last written base file first + base_files_in_file_id = base_files_by_file_id[file_id].sort(BASE_FILE_REVERSE_COMPARATOR) + # filter out log files that have been log-compacted + log_files_in_file_id = handle_log_compaction(log_files_by_file_id[file_id], timeline) + # sort the log files by ascending order of completion time + log_files_in_file_id.sort(LOG_FILE_COMPARATOR) + # get list of file slice barriers for this fielgroup id + file_slice_barriers = get_file_slice_barriers(base_files_in_file_id, log_files_in_file_id) + # build file slices + file_slices = [] + for log_file in log_files_in_file_id: + file_slice = find_file_slice(log_file, file_slice_barriers) + file_slices.add(file_slice) + + +# Given all log files for a file id, filter out such log files that have been log-compacted. +def handle_log_compaction(log_files_in_file_id, timeline): + log_compaction_instants = timeline.get_completed_log_compaction() + for log_compaction_instant in log_compaction_instant: + log_files_compacted = get_log_files_compacted(log_compaction_instant) + log_files_in_file_id.remove(log_files_compacted) + + return log_files_in_file_id + + +# Given base files and log files for a filegroup id, return a list containing file slice barriers. +def get_file_slice_barriers(base_files_in_file_id, log_files_in_file_id): + file_slice_barriers = [] + if base_files_in_file_id.size > 0: + for base_file in base_files_in_file_id: + file_slice_barriers.add(instant_time(base_file)) + elif log_files_in_file_id.size > 0: + # for a file group with no base file, the instant time of the earliest log file is the barrier + file_slice_barriers.add(instant_time(log_files_in_file_id[0])) + + return file_slice_barriers + + +def find_file_slice(log_file, file_slice_barriers): + completion_time = completion_time(log_file) + for barrier in file_slice_barriers: + if (barrier < completion_time): + # each file slice is attached to a barrier, returns the existing file slice or a fresh new one based on the barrier. + # note that since file_slice_barriers is reverse sorted, we would return the file slice + # corresponding to the max barrier just less than the completion_time Review Comment: i.e max barrier that is less than log's completion time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org