Hi folks,

Here is my proposal.Thank you very much for reading it.I am looking forward to 
your agreement  to create an RFC for it.

Background

In order to deal with the problem that the modification of a small amount of 
local data needs to rewrite the entire partition data, Hudi divided the 
partition into multiple File Groups, and each File Group is identified by the 
File ID. In this way, when a small amount of local data is modified, only the 
data of the corresponding File Group needs to be rewritten. Hudi consistently 
maps the given Hudi record to the File ID through the index mechanism. The 
mapping relationship between Record Key and File Group/File ID will not change 
once the first version of Record is determined.

    At present, Hudi's indexes mainly include Bloom filter index, Hbase index 
and bucket index. The Bloom filter index has a false positive problem. When a 
large amount of data results in a large number of File Groups, the false 
positive problem will magnify and lead to poor performance. The Hbase index 
depends on the external Hbase database, and may be inconsistent, which will 
ultimately increase the operation and maintenance costs. Bucket index makes 
each bucket of the bucket index correspond to a File Group. You need to 
pre-select the number of buckets and use the hash function to determine which 
bucket a record belongs to. Therefore, you can directly determine the mapping 
relationship between the Record Key and the File Group/File ID through the hash 
function. Using the bucket index, you need to determine the number of buckets 
in advance when building the table according to the estimated amount of data, 
and it cannot be changed after building the table. The unreasonable number of 
buckets will seriously affect performance. Unfortunately, the amount of data is 
often unpredictable and will continue to grow.

Hash partition feasibility

     In this context, I put forward the idea of hash partitioning. The 
principle is similar to bucket index, but in addition to the advantages of 
bucket index, hash partitioning can retain the Bloom index. When the amount of 
data in a hash partition is too large, the data in that partition will be split 
into multiple files in the way of Bloom index. Therefore, the problem that 
bucket index depends heavily on the number of buckets does not exist in the 
hash partition. Compared with the Bloom index, when searching for a data, you 
can directly search the data under the partition, which greatly reduces the 
scope of the Bloom filter to search for files and reduces the false positive of 
the Bloom filter.

Design of a simple hash partition implementation

The idea is to use the capabilities of the ComplexKeyGenerator to implement 
hash partitioning. Hash partition field is one of the partition fields of the 
ComplexKeyGenerator.

   When hash.partition.fields is specified and partition.fields contains 
_hoodie_hash_partition, a column named _hoodie_hash_partition will be added in 
this table as one of the partition key.

If predicates of hash.partition.fields appear in the query statement, the 
_hoodie_hash_partition = X predicate will be automatically added to the query 
statement for partition pruning.

Advantages of this design: simple implementation, no modification of core 
functions, so low risk.

The above design has been implemented in pr 7984.

https://github.com/apache/hudi/pull/7984

How to use hash partition in spark data source can refer to

https://github.com/lvhu-goodluck/hudi/blob/hash_partition_spark_data_source/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala

#testHashPartition

Perhaps for experts, the implementation of PR is not elegant enough. I also 
look forward to a more elegant implementation, from which I can learn more.

Problems with hash partition:

Because SparkSQL does not know the existence of hash partitions, when two 
hash-partitioned tables perform associated queries, it may not get the optimal 
execution plan. However, because hudi still has the ability to prune a single 
table, the performance of two tables can still be greatly improved by 
performing associated queries, compared with no hash partitioning.

The same problem also exists in the operation of group by. SparkSQL does not 
know that the aggregation operation has been completed in the hash partition. 
It may aggregate the results of different partition aggregation again. However, 
due to the data volume after partition aggregation is very small, the redundant 
and useless aggregation operation has little impact on the overall performance.

I'm not sure whether bucket indexes will have the same problem as hash 
partitions.

















At 2023-02-17 05:18:19, "Y Ethan Guo" <[email protected]> wrote:
>+1 Thanks Lvhu for bringing up the idea.  As Alexey suggested, it would be
>good for you to write down the proposal with design details for discussion
>in the community.
>
>On Thu, Feb 16, 2023 at 11:28 AM Alexey Kudinkin <[email protected]> wrote:
>
>> Thanks for your contribution, Lvhu!
>>
>> I think we should actually kick-start this effort with an small RFC
>> outlining proposed changes first, as this is modifying the core read-flow
>> for all Hudi tables and we want to make sure our approach there is
>> rock-solid.
>>
>> On Thu, Feb 16, 2023 at 6:34 AM 吕虎 <[email protected]> wrote:
>>
>> > Hi folks,
>> >       PR 7984【 https://github.com/apache/hudi/pull/7984 】 implements
>> hash
>> > partitioning.
>> >       As you know, It is often difficult to find an appropriate partition
>> > key in the existing big data. Hash partitioning can easily solve this
>> > problem. it can greatly improve the performance of hudi's big data
>> > processing.
>> >       The idea is to use the hash partition field as one of the partition
>> > fields of the ComplexKeyGenerator, so this PR  implementation does not
>> > involve logic modification of core code.
>> >       The codes are easy to review, but I think hash partition is very
>> > usefull. we really need it.
>> >       How to use hash partition in spark data source can refer to
>> >
>> https://github.com/lvhu-goodluck/hudi/blob/hash_partition_spark_data_source/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
>> >  #testHashPartition
>> >
>> >       No public API or user-facing feature change or any performance
>> > impact if the hash partition parameters are not specified.
>> >
>> >       When hash.partition.fields is specified and partition.fields
>> > contains _hoodie_hash_partition, a column named _hoodie_hash_partition
>> will
>> > be added in this table as one of the partition key.
>> >
>> >       If predicates of hash.partition.fields appear in the query
>> > statement, the _hoodie_hash_partition = X predicate will be automatically
>> > added to the query statement for partition pruning.
>> >
>> >         Hope folks help and review!
>> >       Thanks!
>> > Lvhu
>> >
>>

Reply via email to