This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6f53c7fb73f HIVE-29166: Fix the partition column update logic in
ConvertJoinMapJoin#convertJoinBucketMapJoin. (#6048)
6f53c7fb73f is described below
commit 6f53c7fb73ffc4674234957106c597d4a42bccd9
Author: Seonggon Namgung <[email protected]>
AuthorDate: Fri Sep 5 01:52:52 2025 +0900
HIVE-29166: Fix the partition column update logic in
ConvertJoinMapJoin#convertJoinBucketMapJoin. (#6048)
---
.../src/test/queries/positive/bucket_map_join_9.q | 9 ++
.../test/results/positive/bucket_map_join_9.q.out | 65 ++++++++++++
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 47 +++++----
.../test/queries/clientpositive/bucketmapjoin14.q | 9 ++
.../clientpositive/llap/bucketmapjoin14.q.out | 112 +++++++++++++++++++++
5 files changed, 222 insertions(+), 20 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/bucket_map_join_9.q
b/iceberg/iceberg-handler/src/test/queries/positive/bucket_map_join_9.q
new file mode 100644
index 00000000000..4c201d71850
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/bucket_map_join_9.q
@@ -0,0 +1,9 @@
+set hive.auto.convert.join=true;
+set hive.convert.join.bucket.mapjoin.tez=true;
+
+CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY
SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG;
+INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2');
+
+EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/bucket_map_join_9.q.out
b/iceberg/iceberg-handler/src/test/results/positive/bucket_map_join_9.q.out
new file mode 100644
index 00000000000..8153bdd697f
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/bucket_map_join_9.q.out
@@ -0,0 +1,65 @@
+PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string)
PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl
+POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string)
PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl
+PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235',
'PART_124', '2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl
+POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235',
'PART_124', '2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl
+PREHOOK: query: EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Map 1 <- Map 2 (CUSTOM_EDGE)
+
+Stage-0
+ Fetch Operator
+ limit:-1
+ Stage-1
+ Map 1 vectorized
+ File Output Operator [FS_53]
+ Map Join Operator [MAPJOIN_52] (rows=2 width=530)
+ BucketMapJoin:true,Conds:SEL_51._col1, _col2=RS_49._col1,
_col2(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
+ <-Map 2 [CUSTOM_EDGE] vectorized
+ MULTICAST [RS_49]
+ PartitionCols:_col2, _col1
+ Select Operator [SEL_48] (rows=2 width=265)
+ Output:["_col0","_col1","_col2"]
+ Filter Operator [FIL_47] (rows=2 width=265)
+ predicate:(id is not null and part is not null)
+ TableScan [TS_3] (rows=2 width=265)
+
default@tbl,tbl2,Tbl:COMPLETE,Col:COMPLETE,Output:["foid","part","id"]
+ <-Select Operator [SEL_51] (rows=2 width=265)
+ Output:["_col0","_col1","_col2"]
+ Filter Operator [FIL_50] (rows=2 width=265)
+ predicate:(id is not null and part is not null)
+ TableScan [TS_0] (rows=2 width=265)
+ default@tbl,tbl,Tbl:COMPLETE,Col:COMPLETE,Grouping Num
Buckets:100,Grouping Partition Columns:["id","part"],Output:["foid","part","id"]
+
+PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND
tbl.part = tbl2.part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND
tbl.part = tbl2.part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1234 PART_123 1 1234 PART_123 1
+1235 PART_124 2 1235 PART_124 2
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 37348dcef06..a622a0a7c02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -656,32 +656,39 @@ private boolean convertJoinBucketMapJoin(JoinOperator
joinOp, OptimizeTezProcCon
// on small table(s).
ReduceSinkOperator bigTableRS =
(ReduceSinkOperator)joinOp.getParentOperators().get(bigTablePosition);
OpTraits opTraits = bigTableRS.getOpTraits();
- List<List<String>> listBucketCols = opTraits.getBucketColNames();
+ // It is guaranteed there is only 1 list within
bigTableRS.getOpTraits().getBucketColNames().
+ List<String> listBucketCols = opTraits.getBucketColNames().get(0);
List<ExprNodeDesc> bigTablePartitionCols =
bigTableRS.getConf().getPartitionCols();
- boolean updatePartitionCols = false;
+ boolean updatePartitionCols = listBucketCols.size() !=
bigTablePartitionCols.size();
List<Integer> positions = new ArrayList<>();
- CustomBucketFunction bucketFunction =
opTraits.getCustomBucketFunctions().get(0);
- if (listBucketCols.get(0).size() != bigTablePartitionCols.size()) {
- updatePartitionCols = true;
- // Prepare updated partition columns for small table(s).
- // Get the positions of bucketed columns
-
- int bigTableExprPos = 0;
- Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap();
- final boolean[] retainedColumns = new
boolean[listBucketCols.get(0).size()];
- for (ExprNodeDesc bigTableExpr : bigTablePartitionCols) {
- // It is guaranteed there is only 1 list within listBucketCols.
- for (int i = 0; i < listBucketCols.get(0).size(); i++) {
- final String colName = listBucketCols.get(0).get(i);
- if (colExprMap.get(colName).isSame(bigTableExpr)) {
- positions.add(bigTableExprPos);
- retainedColumns[i] = true;
- }
+ // Compare the partition columns and the bucket columns of bigTableRS.
+ Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap();
+ final boolean[] retainedColumns = new boolean[listBucketCols.size()];
+ for (int bucketColIdx = 0; bucketColIdx < listBucketCols.size();
bucketColIdx++) {
+ for (int bigTablePartIdx = 0; bigTablePartIdx <
bigTablePartitionCols.size(); bigTablePartIdx++) {
+ ExprNodeDesc bigTablePartExpr =
bigTablePartitionCols.get(bigTablePartIdx);
+ ExprNodeDesc bucketColExpr =
colExprMap.get(listBucketCols.get(bucketColIdx));
+ if (bigTablePartExpr.isSame(bucketColExpr)) {
+ positions.add(bigTablePartIdx);
+ retainedColumns[bucketColIdx] = true;
+ // If the positions of the partition column and the bucket column
are not the same,
+ // then we need to update the position of the partition column in
small tables.
+ updatePartitionCols = updatePartitionCols || bucketColIdx !=
bigTablePartIdx;
+ break;
}
- bigTableExprPos = bigTableExprPos + 1;
}
+ }
+ // If the number of partition columns is less than the number of bucket
columns,
+ // then we cannot properly distribute small tables onto bucketized map
tasks.
+ // Bail out.
+ if (positions.size() < listBucketCols.size()) {
+ return false;
+ }
+
+ CustomBucketFunction bucketFunction =
opTraits.getCustomBucketFunctions().get(0);
+ if (updatePartitionCols) {
Preconditions.checkState(opTraits.getCustomBucketFunctions().size() ==
1);
if (opTraits.getCustomBucketFunctions().get(0) != null) {
final Optional<CustomBucketFunction> selected =
diff --git a/ql/src/test/queries/clientpositive/bucketmapjoin14.q
b/ql/src/test/queries/clientpositive/bucketmapjoin14.q
new file mode 100644
index 00000000000..b710456d75c
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/bucketmapjoin14.q
@@ -0,0 +1,9 @@
+set hive.auto.convert.join=true;
+set hive.convert.join.bucket.mapjoin.tez=true;
+
+CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part)
INTO 64 BUCKETS;
+INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2');
+
+EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
diff --git a/ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out
b/ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out
new file mode 100644
index 00000000000..4cf433e58f1
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out
@@ -0,0 +1,112 @@
+PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string)
CLUSTERED BY (id, part) INTO 64 BUCKETS
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl
+POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string)
CLUSTERED BY (id, part) INTO 64 BUCKETS
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl
+PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235',
'PART_124', '2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl
+POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235',
'PART_124', '2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl
+POSTHOOK: Lineage: tbl.foid SCRIPT []
+POSTHOOK: Lineage: tbl.id SCRIPT []
+POSTHOOK: Lineage: tbl.part SCRIPT []
+PREHOOK: query: EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Map 1 <- Map 2 (CUSTOM_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: tbl
+ filterExpr: (id is not null and part is not null) (type:
boolean)
+ Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE
Column stats: COMPLETE
+ Filter Operator
+ predicate: (id is not null and part is not null) (type:
boolean)
+ Statistics: Num rows: 2 Data size: 530 Basic stats:
COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: foid (type: string), part (type: string),
id (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 2 Data size: 530 Basic stats:
COMPLETE Column stats: COMPLETE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col1 (type: string), _col2 (type: string)
+ 1 _col1 (type: string), _col2 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4,
_col5
+ input vertices:
+ 1 Map 2
+ Statistics: Num rows: 2 Data size: 1060 Basic stats:
COMPLETE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 2 Data size: 1060 Basic stats:
COMPLETE Column stats: COMPLETE
+ table:
+ input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+ Map 2
+ Map Operator Tree:
+ TableScan
+ alias: tbl2
+ filterExpr: (id is not null and part is not null) (type:
boolean)
+ Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE
Column stats: COMPLETE
+ Filter Operator
+ predicate: (id is not null and part is not null) (type:
boolean)
+ Statistics: Num rows: 2 Data size: 530 Basic stats:
COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: foid (type: string), part (type: string),
id (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 2 Data size: 530 Basic stats:
COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col1 (type: string), _col2 (type:
string)
+ null sort order: zz
+ sort order: ++
+ Map-reduce partition columns: _col2 (type: string),
_col1 (type: string)
+ Statistics: Num rows: 2 Data size: 530 Basic stats:
COMPLETE Column stats: COMPLETE
+ value expressions: _col0 (type: string)
+ Execution mode: vectorized, llap
+ LLAP IO: all inputs
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND
tbl.part = tbl2.part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND
tbl.part = tbl2.part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl
+#### A masked pattern was here ####
+1234 PART_123 1 1234 PART_123 1
+1235 PART_124 2 1235 PART_124 2