c21 opened a new pull request #33432:
URL: https://github.com/apache/spark/pull/33432


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a 
faster review.
     7. If you want to add a new configuration, please read the guideline first 
for naming configurations in
        
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the 
guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section 
is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster 
reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class 
hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other 
DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This is a re-work of https://github.com/apache/spark/pull/30003, here we add 
support for writing Hive bucketed table with Parquet/ORC file format (data 
source v1 write path and Hive hash as the hash function). Support for Hive's 
other file format will be added in follow up PR.
   
   The changes are mostly on:
   
   * `HiveMetastoreCatalog.scala`: When converting hive table relation to data 
source relation, pass bucket info (BucketSpec) and other hive related info as 
options into `HadoopFsRelation` and `LogicalRelation`, which can be later 
accessed by `FileFormatWriter` to customize bucket id and file name.
   
   * `FileFormatWriter.scala`: Use `HiveHash` for `bucketIdExpression` if it's 
writing to Hive bucketed table. In addition, Spark output file name should 
follow Hive/Presto/Trino bucketed file naming convention. Introduce another 
parameter `bucketFileNamePrefix` and it introduces subsequent change in 
`FileFormatDataWriter`.
   
   * `HadoopMapReduceCommitProtocol`: Implement the new file name APIs 
introduced in https://github.com/apache/spark/pull/33012, and change its 
sub-class `PathOutputCommitProtocol`, to make Hive bucketed table writing work 
with all commit protocol (including S3A commit protocol).
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   To make Spark write other-SQL-engines-compatible bucketed table. Currently 
Spark bucketed table cannot be leveraged by other SQL engines like Hive and 
Presto, because it uses a different hash function (Spark murmur3hash). With 
this PR, the Spark-written-Hive-bucketed-table can be efficiently read by 
Presto and Hive to do bucket filter pruning, join, group-by, etc. This was and 
is blocking several companies (confirmed from Facebook, Lyft, etc) migrate 
bucketing workload from Hive to Spark.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as 
the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes 
- provide the console output, description and/or an example to show the 
behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to 
the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes, any Hive bucketed table (with Parquet/ORC format) written by Spark, is 
properly bucketed and can be efficiently processed by Hive and Presto/Trino.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some 
test cases that check the changes thoroughly including negative and positive 
cases if possible.
   If it was tested in a way different from regular unit tests, please clarify 
how you tested step by step, ideally copy and paste-able, so that other 
reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why 
it was difficult to add.
   -->
   * Added unit test in BucketedWriteWithHiveSupportSuite.scala, to verify 
bucket file names and each row in each bucket is written properly.
   * WIP test in production. Will update later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to