I got some feedback that this thread may be a bit complex to understand. So I tried to simplify proposal to below:
Users can already specify 'partitionpath' using this config <https://hudi.apache.org/docs/configurations.html#PARTITIONPATH_FIELD_OPT_KEY> when writing data. My proposal is we also give users the ability to identify (or hint at) 'fileId' to while writing the data. For example, users can say 'locality.columns: session_id'. We deterministically map every session_id to a specific fileGroup in hudi (using hash-modulo or range-partitioning etc). So all values for a session_id are co-located in the same data/log file. Hopefully, this explains the idea better. Appreciate any feedback. On Mon, Feb 1, 2021 at 3:43 PM Satish Kotha <[email protected]> wrote: > Hello, > > Clustering <https://hudi.apache.org/blog/hudi-clustering-intro/> is a > great feature for improving data locality. But it has a (relatively big) > cost to rewrite the data after ingestion. I think there are other ways to > improve data locality during ingestion. For example, we can add a new Index > (or partitioner) that reads values for columns that are important from a > data locality perspective. We could then compute hash modulo on the value > and use that to deterministically identify the file group that the record > has to be written into. > > More detailed example: > Assume we introduce 2 new config: > hoodie.datasource.write.num.file.groups: "N" #Controls the total number of > file Ids allowed (per partition). > > hoodie.datasource.write.locality.columns: "session_id,timestamp" #Identify > columns that are important for data locality. > > (I can come up with better names for config if the general idea sounds > good). > > During ingestion, we generate 'N' fileIds for each partition (if that > partition has already K fileIds, we generate N-K new fileIds). Let's say > these fileIds are stored in fileIdList data structure. For each row, we > compute 'hash(row.get(session_id)+row.get(timestamp)) % N'. This value is > used as the index into fileIdList data structure to deterministically > identify the file group for the row. > > This improves data locality by ensuring columns with a given value are > stored in the same file. This hashing could be done in two places: > 1) A custom index that tags location for each row based on values for > 'session_id+timestamp'. > 2) In a new partitioner that assigns buckets for each row based of values > for 'session_id+timestamp' > > *Advantages:* > 1) No need to rewrite data for improving data locality. > 2) Integrates well with hive bucketing (spark is also adding support for > hive bucketing <https://issues.apache.org/jira/browse/SPARK-19256>) > 3) This reduces scan cycles to find a particular key because this ensures > that the key is present in a certain fileId. Similarly, joining across > multiple tables would be efficient if they both choose the same > 'locality.columns'. > > *Disadvantages:* > 1) Users need to know the total number of filegroups to generate per > partition. This value is assumed to be static for all partitions. So if > significant changes are expected in traffic volume, this may not partition > the data well. (We could also consider making this static per partition, > which adds additional complexity, but feasible to do) > 2) This may not be as efficient as clustering. For example, data for a > given column value is guaranteed to be co-located in the same file. But > they may not be in the same block (row group in parquet). So more blocks > need to be read by query engines. > > > Clustering can still be useful for other use cases such as stitching > files, transforming data for efficiency etc. Clustering can also be useful > for a few sorting scenarios - e.g., if users cannot predict a good value > for the number of file groups needed. > > Appreciate any feedback. Let me know if you have other ideas on improving > data locality. If you are interested in this idea and want to collaborate, > please reach out. > > Thanks > Satish >
