This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new be528a2050 HIVE-26110: Bulk insert into partitioned table creates lots of files in iceberg (#3174) (Adam Szita, reviewed by Marton Bod, Peter Vary and Rajesh Balamohan) be528a2050 is described below commit be528a2050b3164ac7dd779cff932c5c6449025b Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Tue Apr 5 11:01:56 2022 +0200 HIVE-26110: Bulk insert into partitioned table creates lots of files in iceberg (#3174) (Adam Szita, reviewed by Marton Bod, Peter Vary and Rajesh Balamohan) --- .../src/test/results/positive/dynamic_partition_writes.q.out | 4 ++++ .../hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java | 7 ++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out index 91b3808faf..36fb39c091 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out @@ -75,6 +75,7 @@ Stage-3 Output:["_col0","_col1","_col1"] <-Map 1 [SIMPLE_EDGE] vectorized PARTITION_ONLY_SHUFFLE [RS_13] + PartitionCols:_col1 Select Operator [SEL_12] (rows=20 width=91) Output:["_col0","_col1"] TableScan [TS_0] (rows=20 width=91) @@ -166,6 +167,7 @@ Stage-3 Output:["_col0","_col1","iceberg_bucket(_col1, 2)"] <-Map 1 [SIMPLE_EDGE] vectorized PARTITION_ONLY_SHUFFLE [RS_13] + PartitionCols:iceberg_bucket(_col1, 2) Select Operator [SEL_12] (rows=20 width=91) Output:["_col0","_col1"] TableScan [TS_0] (rows=20 width=91) @@ -257,6 +259,7 @@ Stage-3 Output:["_col0","_col1","_col2","_col1","iceberg_bucket(_col2, 3)"] <-Map 1 [SIMPLE_EDGE] vectorized PARTITION_ONLY_SHUFFLE [RS_13] + PartitionCols:_col1, iceberg_bucket(_col2, 3) Select Operator [SEL_12] (rows=20 width=99) Output:["_col0","_col1","_col2"] TableScan [TS_0] (rows=20 width=99) @@ -387,6 +390,7 @@ Stage-3 Output:["_col0","_col1","_col2","_col1","iceberg_bucket(_col2, 3)"] <-Map 1 [SIMPLE_EDGE] vectorized PARTITION_ONLY_SHUFFLE [RS_16] + PartitionCols:_col1, iceberg_bucket(_col2, 3) Select Operator [SEL_15] (rows=4 width=99) Output:["_col0","_col1","_col2"] Filter Operator [FIL_14] (rows=4 width=99) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 03b4124a6f..2668f269b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -648,7 +648,12 @@ public class SortedDynPartitionOptimizer extends Transform { ArrayList<ExprNodeDesc> partCols = Lists.newArrayList(); for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) { - keyCols.add(customSortExpr.apply(allCols)); + ExprNodeDesc colExpr = customSortExpr.apply(allCols); + // Custom sort expressions are marked as KEYs, which is required for sorting the rows that are going for + // a particular reducer instance. They also need to be marked as 'partition' columns for MapReduce shuffle + // phase, in order to gather the same keys to the same reducer instances. + keyCols.add(colExpr); + partCols.add(colExpr); } // we will clone here as RS will update bucket column key with its