Hello,

Clustering <https://hudi.apache.org/blog/hudi-clustering-intro/> is a great
feature for improving data locality. But it has a (relatively big) cost to
rewrite the data after ingestion. I think there are other ways to improve
data locality during ingestion. For example, we can add a new Index (or
partitioner) that reads values for columns that are important from a data
locality perspective. We could then compute hash modulo on the value and
use that to deterministically identify the file group that the record has
to be written into.

More detailed example:
Assume we introduce 2 new config:
hoodie.datasource.write.num.file.groups: "N" #Controls the total number of
file Ids allowed (per partition).

hoodie.datasource.write.locality.columns: "session_id,timestamp" #Identify
columns that are important for data locality.

(I can come up with better names for config if the general idea sounds
good).

During ingestion, we generate 'N' fileIds for each partition (if that
partition has already K fileIds, we generate N-K new fileIds). Let's say
these fileIds are stored in fileIdList data structure. For each row, we
compute 'hash(row.get(session_id)+row.get(timestamp)) % N'.  This value is
used as the index into fileIdList data structure to deterministically
identify the file group for the row.

This improves data locality by ensuring columns with a given value are
stored in the same file. This hashing could be done in two places:
1) A custom index that tags location for each row based on values for
'session_id+timestamp'.
2) In a new partitioner that assigns buckets for each row based of values
for 'session_id+timestamp'

*Advantages:*
1) No need to rewrite data for improving data locality.
2) Integrates well with hive bucketing (spark is also adding support for
hive bucketing <https://issues.apache.org/jira/browse/SPARK-19256>)
3) This reduces scan cycles to find a particular key because this ensures
that the key is present in a certain fileId. Similarly, joining across
multiple tables would be efficient if they both choose the same
'locality.columns'.

*Disadvantages:*
1) Users need to know the total number of filegroups to generate per
partition. This value is assumed to be static for all partitions. So if
significant changes are expected in traffic volume, this may not partition
the data well.  (We could also consider making this static per partition,
which adds additional complexity, but feasible to do)
2) This may not be as efficient as clustering. For example, data for a
given column value is guaranteed to be co-located in the same file.  But
they may not be in the same block (row group in parquet).  So more blocks
need to be read by query engines.


Clustering can still be useful for other use cases such as stitching files,
transforming data for efficiency etc. Clustering can also be useful for a
few sorting scenarios - e.g., if users cannot predict a good value for the
number of file groups needed.

Appreciate any feedback. Let me know if you have other ideas on improving
data locality. If you are interested in this idea and want to collaborate,
please reach out.

Thanks
Satish

Reply via email to