Hi hudi community! This is the proposal of optimization for wide tables in 
cases when multiple writers write their own subset of fields.

In streaming processing there are often scenarios where the table is widened. 
The current mainstream real-time stretching is completed through flink's 
multi-layer join; flink's join will cache a large amount of data in the state 
backend. As the data set increases, the pressure on the flink task state 
backend will gradually increase, and may even become unavailable.
Example: multiple flink streams, each stream writes its own fields, and finally 
the data written by multiple streams is combine into a complete wide table
Flink job1 writes fields: joinkey, a, b, c 
Flink job2 writes fields: joinkey, d, e, f
Flink job3 writes fields: joinkey, g, h
When reading data, merge and widen the output: joinkey,a,b,c,d,e,f,g,h

The layout of hudi files is divided according to the following rules: data in 
the partition is divided into buckets according to hash; the files in each 
bucket are divided according to colum family; multiple column family files in 
the bucket form a completed fileGroup; when there is only one column family, it 
degenerates into the native hudi bucket table. The data belonging to a certain 
column family is sorted and directly written by each writer to its 
corresponding column family log file. The entire reading process involves a 
large amount of data merging, but because the data itself is sorted, the memory 
consumption of the entire merging process is very low and the merging is fast.

Constraints
1. The overall design relies on the lock-free concurrent writing feature of 
Hudi release 1.0.
2. The lower version of Hudi cannot read and write column family tables; the 
higher version of Hudi can read and write the lower version of Hudi.
3. Since the column family concept is supported, the writing performance of the 
entire row of data will be reduced; the writing performance of a small number 
of column updates will be greatly improved, and the IO will be significantly 
reduced.
4. Only MOR bucketed tables support setting column families. Other types of 
tables do not support it and do not need to support it.
5. Column families do not support repartitioning and renaming.
6. Schema evolution does not take effect on the current column family table.
7. Cluster operations are not supported.

Performance impact
Writing:
An ordinary writing requires the process of splitting column families and 
sorting; therefore, the performance of full data writing is lower than that of 
native bucket writing. However, if only some columns are updated in a large 
number of columns, the writing efficiency is much faster than that of 
non-column family tables.
Reading:
Since the data is already sorted when it is written, the SortMerge method can 
be used directly to merge the data; compare with the native bucket data reading 
performance will be significantly improved and the memory consumption will be 
reduced.
Compaction:
The logic of compaction and reading itself is the same.
===

If you are interested in this feature and think it's useful, let me know in 
response to this letter, I'll create umbrella ticket in Jira with development 
task decomposition and start writing an RFC on github providing architectural 
and implementation details. Also I'm ready to implement this whole feature by 
myself.

Reply via email to