Hello All, I would like to sort records in each file on COW table by a given key while ingesting/writing data - I am using Spark Data source + Kafka (Structured Streaming). HUDI is doing a great thing of getting each file to the optimal file size, (by compaction and appending data to smaller files) so when we get a file in its optimal file size the records in that file also should be sorted if a sort/order-by column is provided. So I don’t have to do another round of processing on the optimized file.
Regards, Felix K Jose From: Rubens Rodrigues <[email protected]> Date: Tuesday, February 9, 2021 at 8:36 PM To: [email protected] <[email protected]> Subject: Re: [DISCUSS] Improve data locality during ingestion Caution: This e-mail originated from outside of Philips, be careful for phishing. Hi guys, Talking about my use case... I have datasets that ordering data by date makes a lot sense or ordering by some id to have less touched files on merge operations. On my use of delta lake I used to bootstrap tables ever ordering by one of these fields and helps a lot on file pruning. Hudi clustering do this job but I think it is an unnecessary extra step to do after bulk insert because all data will need to be rewrite again. Em ter, 9 de fev de 2021 21:53, Vinoth Chandar <[email protected]> escreveu: > Hi Satish, > > Been to respond to this. I think I like the idea overall. > > Here's a (hopefully) my understanding version and let me know if I am > getting this right. > > Predominantly, we are just talking about the problem of: where do we send > the "inserts" to. > > Today the upsert partitioner does the file sizing/bin-packing etc for > inserts and then sends some inserts over to existing file groups to > maintain file size. > We can abstract all of this into strategies and some kind of pipeline > abstractions and have it also consider "affinity" to an existing file group > based > on say information stored in the metadata table? > > I think this is complimentary to what we do today and can be helpful. First > thing may be is to abstract the existing write pipeline as a series of > "optimization" > stages and bring things like file sizing under that. > > On bucketing, I am not against Hive bucketing or anything. But with record > level indexes and granular/micro partitions that we can achieve using > clustering, is it still the most efficient design? That's a question I > would love to find answers for. I never liked the static/hash partitioning > based schemes > in bucketing. they introduce a lot of manual data munging, if things > change. > > Thanks > Vinoth > > > > On Wed, Feb 3, 2021 at 5:17 PM Satish Kotha <[email protected]> > wrote: > > > I got some feedback that this thread may be a bit complex to understand. > So > > I tried to simplify proposal to below: > > > > Users can already specify 'partitionpath' using this config > > < > > > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fdocs%2Fconfigurations.html%23PARTITIONPATH_FIELD_OPT_KEY&data=04%7C01%7C%7C5ccc16d6386a45c743be08d8cd644933%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485177900518499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=luyQHXq8pe2V9ojNuxZJRL60HZBpqoIi%2BjhGjCxvdGs%3D&reserved=0 > > > > > when > > writing data. My proposal is we also give users the ability to identify > (or > > hint at) 'fileId' to while writing the data. For example, users can > > say 'locality.columns: > > session_id'. We deterministically map every session_id to a specific > > fileGroup in hudi (using hash-modulo or range-partitioning etc). So all > > values for a session_id are co-located in the same data/log file. > > > > Hopefully, this explains the idea better. Appreciate any feedback. > > > > On Mon, Feb 1, 2021 at 3:43 PM Satish Kotha <[email protected]> > wrote: > > > > > Hello, > > > > > > Clustering > > > <https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fblog%2Fhudi-clustering-intro%2F&data=04%7C01%7C%7C5ccc16d6386a45c743be08d8cd644933%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485177900518499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=breYLo6tMcfMz%2BhJgS8mU9zbzKHLyCIzd48Cs3iJiCk%3D&reserved=0> > > > is a > > > great feature for improving data locality. But it has a (relatively > big) > > > cost to rewrite the data after ingestion. I think there are other ways > to > > > improve data locality during ingestion. For example, we can add a new > > Index > > > (or partitioner) that reads values for columns that are important from > a > > > data locality perspective. We could then compute hash modulo on the > value > > > and use that to deterministically identify the file group that the > record > > > has to be written into. > > > > > > More detailed example: > > > Assume we introduce 2 new config: > > > hoodie.datasource.write.num.file.groups: "N" #Controls the total number > > of > > > file Ids allowed (per partition). > > > > > > hoodie.datasource.write.locality.columns: "session_id,timestamp" > > #Identify > > > columns that are important for data locality. > > > > > > (I can come up with better names for config if the general idea sounds > > > good). > > > > > > During ingestion, we generate 'N' fileIds for each partition (if that > > > partition has already K fileIds, we generate N-K new fileIds). Let's > say > > > these fileIds are stored in fileIdList data structure. For each row, we > > > compute 'hash(row.get(session_id)+row.get(timestamp)) % N'. This value > > is > > > used as the index into fileIdList data structure to deterministically > > > identify the file group for the row. > > > > > > This improves data locality by ensuring columns with a given value are > > > stored in the same file. This hashing could be done in two places: > > > 1) A custom index that tags location for each row based on values for > > > 'session_id+timestamp'. > > > 2) In a new partitioner that assigns buckets for each row based of > values > > > for 'session_id+timestamp' > > > > > > *Advantages:* > > > 1) No need to rewrite data for improving data locality. > > > 2) Integrates well with hive bucketing (spark is also adding support > for > > > hive bucketing > > > <https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-19256&data=04%7C01%7C%7C5ccc16d6386a45c743be08d8cd644933%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485177900518499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=pYt8dxxnjBi5iwCfZiXffDaDQ2Tv53x4Oxfzo3zzah4%3D&reserved=0>) > > > 3) This reduces scan cycles to find a particular key because this > ensures > > > that the key is present in a certain fileId. Similarly, joining across > > > multiple tables would be efficient if they both choose the same > > > 'locality.columns'. > > > > > > *Disadvantages:* > > > 1) Users need to know the total number of filegroups to generate per > > > partition. This value is assumed to be static for all partitions. So if > > > significant changes are expected in traffic volume, this may not > > partition > > > the data well. (We could also consider making this static per > partition, > > > which adds additional complexity, but feasible to do) > > > 2) This may not be as efficient as clustering. For example, data for a > > > given column value is guaranteed to be co-located in the same file. > But > > > they may not be in the same block (row group in parquet). So more > blocks > > > need to be read by query engines. > > > > > > > > > Clustering can still be useful for other use cases such as stitching > > > files, transforming data for efficiency etc. Clustering can also be > > useful > > > for a few sorting scenarios - e.g., if users cannot predict a good > value > > > for the number of file groups needed. > > > > > > Appreciate any feedback. Let me know if you have other ideas on > improving > > > data locality. If you are interested in this idea and want to > > collaborate, > > > please reach out. > > > > > > Thanks > > > Satish > > > > > > ________________________________ The information contained in this message may be confidential and legally protected under applicable law. The message is intended solely for the addressee(s). If you are not the intended recipient, you are hereby notified that any use, forwarding, dissemination, or reproduction of this message is strictly prohibited and may be unlawful. If you are not the intended recipient, please contact the sender by return e-mail and destroy all copies of the original message.
