Hi folks,
Here is my proposal.Thank you very much for reading it.I am looking forward to
your agreement to create an RFC for it.
Background
In order to deal with the problem that the modification of a small amount of
local data needs to rewrite the entire partition data, Hudi divided the
partition into multiple File Groups, and each File Group is identified by the
File ID. In this way, when a small amount of local data is modified, only the
data of the corresponding File Group needs to be rewritten. Hudi consistently
maps the given Hudi record to the File ID through the index mechanism. The
mapping relationship between Record Key and File Group/File ID will not change
once the first version of Record is determined.
At present, Hudi's indexes mainly include Bloom filter index, Hbase index
and bucket index. The Bloom filter index has a false positive problem. When a
large amount of data results in a large number of File Groups, the false
positive problem will magnify and lead to poor performance. The Hbase index
depends on the external Hbase database, and may be inconsistent, which will
ultimately increase the operation and maintenance costs. Bucket index makes
each bucket of the bucket index correspond to a File Group. You need to
pre-select the number of buckets and use the hash function to determine which
bucket a record belongs to. Therefore, you can directly determine the mapping
relationship between the Record Key and the File Group/File ID through the hash
function. Using the bucket index, you need to determine the number of buckets
in advance when building the table according to the estimated amount of data,
and it cannot be changed after building the table. The unreasonable number of
buckets will seriously affect performance. Unfortunately, the amount of data is
often unpredictable and will continue to grow.
Hash partition feasibility
In this context, I put forward the idea of hash partitioning. The
principle is similar to bucket index, but in addition to the advantages of
bucket index, hash partitioning can retain the Bloom index. When the amount of
data in a hash partition is too large, the data in that partition will be split
into multiple files in the way of Bloom index. Therefore, the problem that
bucket index depends heavily on the number of buckets does not exist in the
hash partition. Compared with the Bloom index, when searching for a data, you
can directly search the data under the partition, which greatly reduces the
scope of the Bloom filter to search for files and reduces the false positive of
the Bloom filter.
Design of a simple hash partition implementation
The idea is to use the capabilities of the ComplexKeyGenerator to implement
hash partitioning. Hash partition field is one of the partition fields of the
ComplexKeyGenerator.
When hash.partition.fields is specified and partition.fields contains
_hoodie_hash_partition, a column named _hoodie_hash_partition will be added in
this table as one of the partition key.
If predicates of hash.partition.fields appear in the query statement, the
_hoodie_hash_partition = X predicate will be automatically added to the query
statement for partition pruning.
Advantages of this design: simple implementation, no modification of core
functions, so low risk.
The above design has been implemented in pr 7984.
https://github.com/apache/hudi/pull/7984
How to use hash partition in spark data source can refer to
https://github.com/lvhu-goodluck/hudi/blob/hash_partition_spark_data_source/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
#testHashPartition
Perhaps for experts, the implementation of PR is not elegant enough. I also
look forward to a more elegant implementation, from which I can learn more.
Problems with hash partition:
Because SparkSQL does not know the existence of hash partitions, when two
hash-partitioned tables perform associated queries, it may not get the optimal
execution plan. However, because hudi still has the ability to prune a single
table, the performance of two tables can still be greatly improved by
performing associated queries, compared with no hash partitioning.
The same problem also exists in the operation of group by. SparkSQL does not
know that the aggregation operation has been completed in the hash partition.
It may aggregate the results of different partition aggregation again. However,
due to the data volume after partition aggregation is very small, the redundant
and useless aggregation operation has little impact on the overall performance.
I'm not sure whether bucket indexes will have the same problem as hash
partitions.
At 2023-02-17 05:18:19, "Y Ethan Guo" <[email protected]> wrote:
>+1 Thanks Lvhu for bringing up the idea. As Alexey suggested, it would be
>good for you to write down the proposal with design details for discussion
>in the community.
>
>On Thu, Feb 16, 2023 at 11:28 AM Alexey Kudinkin <[email protected]> wrote:
>
>> Thanks for your contribution, Lvhu!
>>
>> I think we should actually kick-start this effort with an small RFC
>> outlining proposed changes first, as this is modifying the core read-flow
>> for all Hudi tables and we want to make sure our approach there is
>> rock-solid.
>>
>> On Thu, Feb 16, 2023 at 6:34 AM 吕虎 <[email protected]> wrote:
>>
>> > Hi folks,
>> > PR 7984【 https://github.com/apache/hudi/pull/7984 】 implements
>> hash
>> > partitioning.
>> > As you know, It is often difficult to find an appropriate partition
>> > key in the existing big data. Hash partitioning can easily solve this
>> > problem. it can greatly improve the performance of hudi's big data
>> > processing.
>> > The idea is to use the hash partition field as one of the partition
>> > fields of the ComplexKeyGenerator, so this PR implementation does not
>> > involve logic modification of core code.
>> > The codes are easy to review, but I think hash partition is very
>> > usefull. we really need it.
>> > How to use hash partition in spark data source can refer to
>> >
>> https://github.com/lvhu-goodluck/hudi/blob/hash_partition_spark_data_source/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
>> > #testHashPartition
>> >
>> > No public API or user-facing feature change or any performance
>> > impact if the hash partition parameters are not specified.
>> >
>> > When hash.partition.fields is specified and partition.fields
>> > contains _hoodie_hash_partition, a column named _hoodie_hash_partition
>> will
>> > be added in this table as one of the partition key.
>> >
>> > If predicates of hash.partition.fields appear in the query
>> > statement, the _hoodie_hash_partition = X predicate will be automatically
>> > added to the query statement for partition pruning.
>> >
>> > Hope folks help and review!
>> > Thanks!
>> > Lvhu
>> >
>>