I got some feedback that this thread may be a bit complex to understand. So
I tried to simplify proposal to below:

Users can already specify 'partitionpath' using this config
<https://hudi.apache.org/docs/configurations.html#PARTITIONPATH_FIELD_OPT_KEY>
when
writing data. My proposal is we also give users the ability to identify (or
hint at) 'fileId' to while writing the data. For example, users can
say 'locality.columns:
session_id'. We deterministically map every session_id to a specific
fileGroup in hudi (using hash-modulo or range-partitioning etc). So all
values for a session_id are co-located in the same data/log file.

Hopefully, this explains the idea better. Appreciate any feedback.

On Mon, Feb 1, 2021 at 3:43 PM Satish Kotha <[email protected]> wrote:

> Hello,
>
> Clustering <https://hudi.apache.org/blog/hudi-clustering-intro/> is a
> great feature for improving data locality. But it has a (relatively big)
> cost to rewrite the data after ingestion. I think there are other ways to
> improve data locality during ingestion. For example, we can add a new Index
> (or partitioner) that reads values for columns that are important from a
> data locality perspective. We could then compute hash modulo on the value
> and use that to deterministically identify the file group that the record
> has to be written into.
>
> More detailed example:
> Assume we introduce 2 new config:
> hoodie.datasource.write.num.file.groups: "N" #Controls the total number of
> file Ids allowed (per partition).
>
> hoodie.datasource.write.locality.columns: "session_id,timestamp" #Identify
> columns that are important for data locality.
>
> (I can come up with better names for config if the general idea sounds
> good).
>
> During ingestion, we generate 'N' fileIds for each partition (if that
> partition has already K fileIds, we generate N-K new fileIds). Let's say
> these fileIds are stored in fileIdList data structure. For each row, we
> compute 'hash(row.get(session_id)+row.get(timestamp)) % N'.  This value is
> used as the index into fileIdList data structure to deterministically
> identify the file group for the row.
>
> This improves data locality by ensuring columns with a given value are
> stored in the same file. This hashing could be done in two places:
> 1) A custom index that tags location for each row based on values for
> 'session_id+timestamp'.
> 2) In a new partitioner that assigns buckets for each row based of values
> for 'session_id+timestamp'
>
> *Advantages:*
> 1) No need to rewrite data for improving data locality.
> 2) Integrates well with hive bucketing (spark is also adding support for
> hive bucketing <https://issues.apache.org/jira/browse/SPARK-19256>)
> 3) This reduces scan cycles to find a particular key because this ensures
> that the key is present in a certain fileId. Similarly, joining across
> multiple tables would be efficient if they both choose the same
> 'locality.columns'.
>
> *Disadvantages:*
> 1) Users need to know the total number of filegroups to generate per
> partition. This value is assumed to be static for all partitions. So if
> significant changes are expected in traffic volume, this may not partition
> the data well.  (We could also consider making this static per partition,
> which adds additional complexity, but feasible to do)
> 2) This may not be as efficient as clustering. For example, data for a
> given column value is guaranteed to be co-located in the same file.  But
> they may not be in the same block (row group in parquet).  So more blocks
> need to be read by query engines.
>
>
> Clustering can still be useful for other use cases such as stitching
> files, transforming data for efficiency etc. Clustering can also be useful
> for a few sorting scenarios - e.g., if users cannot predict a good value
> for the number of file groups needed.
>
> Appreciate any feedback. Let me know if you have other ideas on improving
> data locality. If you are interested in this idea and want to collaborate,
> please reach out.
>
> Thanks
> Satish
>

Reply via email to