Hi hudi community! This is the proposal of optimization for wide tables in cases when multiple writers write their own subset of fields.
In streaming processing there are often scenarios where the table is widened. The current mainstream real-time stretching is completed through flink's multi-layer join; flink's join will cache a large amount of data in the state backend. As the data set increases, the pressure on the flink task state backend will gradually increase, and may even become unavailable. Example: multiple flink streams, each stream writes its own fields, and finally the data written by multiple streams is combine into a complete wide table Flink job1 writes fields: joinkey, a, b, c Flink job2 writes fields: joinkey, d, e, f Flink job3 writes fields: joinkey, g, h When reading data, merge and widen the output: joinkey,a,b,c,d,e,f,g,h The layout of hudi files is divided according to the following rules: data in the partition is divided into buckets according to hash; the files in each bucket are divided according to colum family; multiple column family files in the bucket form a completed fileGroup; when there is only one column family, it degenerates into the native hudi bucket table. The data belonging to a certain column family is sorted and directly written by each writer to its corresponding column family log file. The entire reading process involves a large amount of data merging, but because the data itself is sorted, the memory consumption of the entire merging process is very low and the merging is fast. Constraints 1. The overall design relies on the lock-free concurrent writing feature of Hudi release 1.0. 2. The lower version of Hudi cannot read and write column family tables; the higher version of Hudi can read and write the lower version of Hudi. 3. Since the column family concept is supported, the writing performance of the entire row of data will be reduced; the writing performance of a small number of column updates will be greatly improved, and the IO will be significantly reduced. 4. Only MOR bucketed tables support setting column families. Other types of tables do not support it and do not need to support it. 5. Column families do not support repartitioning and renaming. 6. Schema evolution does not take effect on the current column family table. 7. Cluster operations are not supported. Performance impact Writing: An ordinary writing requires the process of splitting column families and sorting; therefore, the performance of full data writing is lower than that of native bucket writing. However, if only some columns are updated in a large number of columns, the writing efficiency is much faster than that of non-column family tables. Reading: Since the data is already sorted when it is written, the SortMerge method can be used directly to merge the data; compare with the native bucket data reading performance will be significantly improved and the memory consumption will be reduced. Compaction: The logic of compaction and reading itself is the same. === If you are interested in this feature and think it's useful, let me know in response to this letter, I'll create umbrella ticket in Jira with development task decomposition and start writing an RFC on github providing architectural and implementation details. Also I'm ready to implement this whole feature by myself.