This is an automated email from the ASF dual-hosted git repository.

difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1374460ae65 HIVE-29458: Iceberg: Sort expressions should not be added 
for distribution. Partition transforms are added for clustering (distribution 
and sorting) (#6325)
1374460ae65 is described below

commit 1374460ae65994c9da5ba8341fb80ec8f7a24e44
Author: kokila-19 <[email protected]>
AuthorDate: Wed Mar 11 02:20:34 2026 +0530

    HIVE-29458: Iceberg: Sort expressions should not be added for distribution. 
Partition transforms are added for clustering (distribution and sorting) (#6325)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  58 +++--
 .../mr/hive/IcebergTransformSortFunctionUtil.java  |   4 +-
 .../iceberg_create_locally_zordered_table.q        |  22 ++
 .../llap/iceberg_alter_locally_ordered_table.q.out |   1 -
 .../iceberg_alter_locally_zordered_table.q.out     |   1 -
 .../iceberg_create_locally_zordered_table.q.out    | 241 ++++++++++++++++++++-
 .../hadoop/hive/ql/exec/FileSinkOperator.java      |   2 +-
 .../ql/optimizer/SortedDynPartitionOptimizer.java  | 133 +++++++-----
 .../hadoop/hive/ql/plan/DynamicPartitionCtx.java   |  34 ++-
 9 files changed, 419 insertions(+), 77 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 7bf2aef49a7..0643c26834b 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -890,9 +890,9 @@ public Map<Integer, List<TransformSpec>> 
getPartitionTransformSpecs(
           Pair::first, Collectors.mapping(Pair::second, Collectors.toList())));
   }
 
-  private List<TransformSpec> getSortTransformSpec(Table table) {
-    return table.sortOrder().fields().stream().map(s ->
-            IcebergTableUtil.getTransformSpec(table, s.transform().toString(), 
s.sourceId()))
+  private List<TransformSpec> getWriteSortTransformSpecs(Table table) {
+    return table.sortOrder().fields().stream()
+        .map(s -> IcebergTableUtil.getTransformSpec(table, 
s.transform().toString(), s.sourceId()))
         .toList();
   }
 
@@ -913,11 +913,16 @@ public DynamicPartitionCtx createDPContext(
         hiveConf.getVar(ConfVars.DEFAULT_PARTITION_NAME),
         hiveConf.getIntVar(ConfVars.DYNAMIC_PARTITION_MAX_PARTS_PER_NODE));
 
+    // Add Iceberg partition transforms as custom partition expressions.
+    // These are required for clustering by partition spec/values for 
clustered writers.
     if (table.spec().isPartitioned() &&
-          
hiveConf.getIntVar(ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD) >= 0) {
-      addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, 
getPartitionTransformSpec(hmsTable));
+        hiveConf.getIntVar(ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD) 
>= 0) {
+      addCustomPartitionTransformExpressions(table, hmsTable, writeOperation, 
dpCtx,
+          getPartitionTransformSpec(hmsTable));
     }
 
+    // Add write sort order expressions as custom sort expressions.
+    // These are used ONLY for sorting within reducers, NOT for distribution.
     SortOrder sortOrder = table.sortOrder();
     if (sortOrder.isSorted()) {
       List<Integer> customSortPositions = Lists.newLinkedList();
@@ -943,7 +948,8 @@ public DynamicPartitionCtx createDPContext(
         }
       }
 
-      addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, 
getSortTransformSpec(table));
+      addCustomWriteSortExpressions(table, hmsTable, writeOperation, dpCtx,
+          getWriteSortTransformSpecs(table));
     }
 
     // Even if table has no explicit sort order, honor z-order if configured
@@ -999,21 +1005,43 @@ private void addZOrderCustomExpr(Map<String, String> 
props, DynamicPartitionCtx
     }
   }
 
-  private void addCustomSortExpr(Table table,  
org.apache.hadoop.hive.ql.metadata.Table hmsTable,
-      Operation writeOperation, DynamicPartitionCtx dpCtx,
-      List<TransformSpec> transformSpecs) {
+  private void addCustomPartitionTransformExpressions(Table table,
+      org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation 
writeOperation,
+      DynamicPartitionCtx dpCtx, List<TransformSpec> transformSpecs) {
+    Map<String, Integer> fieldOrderMap = buildFieldOrderMap(table);
+    int offset = getWriteRowOffset(hmsTable, writeOperation);
+
+    dpCtx.addCustomPartitionExpressions(transformSpecs.stream()
+        .map(spec -> IcebergTransformSortFunctionUtil.getCustomTransformExpr(
+            spec, fieldOrderMap.get(spec.getColumnName()) + offset))
+        .toList());
+  }
+
+  private void addCustomWriteSortExpressions(Table table,
+      org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation 
writeOperation,
+      DynamicPartitionCtx dpCtx, List<TransformSpec> transformSpecs) {
+    Map<String, Integer> fieldOrderMap = buildFieldOrderMap(table);
+    int offset = getWriteRowOffset(hmsTable, writeOperation);
+
+    dpCtx.addCustomSortExpressions(transformSpecs.stream()
+        .map(spec -> IcebergTransformSortFunctionUtil.getCustomTransformExpr(
+            spec, fieldOrderMap.get(spec.getColumnName()) + offset))
+        .toList());
+  }
+
+  private Map<String, Integer> buildFieldOrderMap(Table table) {
     List<Types.NestedField> fields = table.schema().columns();
     Map<String, Integer> fieldOrderMap = 
Maps.newHashMapWithExpectedSize(fields.size());
     for (int i = 0; i < fields.size(); ++i) {
       fieldOrderMap.put(fields.get(i).name(), i);
     }
+    return fieldOrderMap;
+  }
 
-    int offset = (shouldOverwrite(hmsTable, writeOperation) ?
-        ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns(hmsTable, 
writeOperation)).size();
-
-    dpCtx.addCustomSortExpressions(transformSpecs.stream().map(spec ->
-        IcebergTransformSortFunctionUtil.getCustomSortExprs(spec, 
fieldOrderMap.get(spec.getColumnName()) + offset)
-    ).collect(Collectors.toList()));
+  private int getWriteRowOffset(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable, Operation writeOperation) {
+    return (shouldOverwrite(hmsTable, writeOperation) ?
+        ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA :
+        acidSelectColumns(hmsTable, writeOperation)).size();
   }
 
   @Override
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTransformSortFunctionUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTransformSortFunctionUtil.java
index f2bc36abca4..b0ceba4d2d5 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTransformSortFunctionUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTransformSortFunctionUtil.java
@@ -37,7 +37,7 @@
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 /**
- * A utility class which provides Iceberg transform sort functions.
+ * Utility for building Iceberg transform expressions used by both 
partitioning and sorting.
  */
 public final class IcebergTransformSortFunctionUtil {
 
@@ -136,7 +136,7 @@ private IcebergTransformSortFunctionUtil() {
             }
           };
 
-  public static Function<List<ExprNodeDesc>, ExprNodeDesc> 
getCustomSortExprs(TransformSpec spec, int index) {
+  public static Function<List<ExprNodeDesc>, ExprNodeDesc> 
getCustomTransformExpr(TransformSpec spec, int index) {
     switch (spec.getTransformType()) {
       case BUCKET:
         return BUCKET_SORT_EXPR.apply(index, spec.getTransformParam());
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_zordered_table.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_zordered_table.q
index df79ad94179..fbdd66470bb 100644
--- 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_zordered_table.q
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_create_locally_zordered_table.q
@@ -109,3 +109,25 @@ INSERT INTO default.zorder_props VALUES (3, 'B'),(1, 
'A'),(7, 'C'),(2, 'A'),(9,
 DESCRIBE FORMATTED default.zorder_props;
 SELECT * FROM default.zorder_props;
 DROP TABLE default.zorder_props;
+
+-- Validates partition transform (bucket) + z-order sort together
+CREATE TABLE default.zorder_tsdl_test (
+    ts timestamp,
+    dd double,
+    ll int)
+PARTITIONED BY SPEC (bucket(4, ll))
+WRITE ORDERED BY zorder (ts, dd)
+STORED BY iceberg
+STORED As orc;
+
+DESCRIBE FORMATTED default.zorder_tsdl_test;
+EXPLAIN INSERT INTO default.zorder_tsdl_test VALUES (TIMESTAMP '2022-01-01 
00:00:00', 0.0, 0);
+
+INSERT INTO default.zorder_tsdl_test VALUES
+  (TIMESTAMP '2022-01-01 00:00:00', 0.0, 0),
+  (TIMESTAMP '2030-12-31 23:59:59', 9999.99, 1),
+  (TIMESTAMP '2026-06-15 12:00:00', 5000.5, 2);
+
+SELECT * FROM default.zorder_tsdl_test;
+DROP TABLE default.zorder_tsdl_test;
+
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out
index e42a27153cb..74a2945b82d 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_ordered_table.q.out
@@ -158,7 +158,6 @@ STAGE PLANS:
                           key expressions: _col0 (type: int), _col1 (type: 
string)
                           null sort order: az
                           sort order: -+
-                          Map-reduce partition columns: _col0 (type: int), 
_col1 (type: string)
                           Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
                           value expressions: _col0 (type: int), _col1 (type: 
string), _col2 (type: int), _col3 (type: string)
                         Select Operator
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
index 83947b35e9a..53293d3798c 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
@@ -264,7 +264,6 @@ STAGE PLANS:
                           key expressions: iceberg_zorder(_col0, _col2) (type: 
binary)
                           null sort order: z
                           sort order: +
-                          Map-reduce partition columns: iceberg_zorder(_col0, 
_col2) (type: binary)
                           Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
                           value expressions: _col0 (type: int), _col1 (type: 
string), _col2 (type: int), _col3 (type: string)
                         Select Operator
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
index d5ab8aafda8..42f0631140f 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
@@ -109,7 +109,6 @@ STAGE PLANS:
                           key expressions: iceberg_zorder(_col0, _col1) (type: 
binary)
                           null sort order: z
                           sort order: +
-                          Map-reduce partition columns: iceberg_zorder(_col0, 
_col1) (type: binary)
                           Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
                           value expressions: _col0 (type: int), _col1 (type: 
string)
                         Select Operator
@@ -610,3 +609,243 @@ POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@zorder_props
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@zorder_props
+PREHOOK: query: CREATE TABLE default.zorder_tsdl_test (
+    ts timestamp,
+    dd double,
+    ll int)
+PARTITIONED BY SPEC (bucket(4, ll))
+WRITE ORDERED BY zorder (ts, dd)
+STORED BY iceberg
+STORED As orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@zorder_tsdl_test
+POSTHOOK: query: CREATE TABLE default.zorder_tsdl_test (
+    ts timestamp,
+    dd double,
+    ll int)
+PARTITIONED BY SPEC (bucket(4, ll))
+WRITE ORDERED BY zorder (ts, dd)
+STORED BY iceberg
+STORED As orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@zorder_tsdl_test
+PREHOOK: query: DESCRIBE FORMATTED default.zorder_tsdl_test
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@zorder_tsdl_test
+POSTHOOK: query: DESCRIBE FORMATTED default.zorder_tsdl_test
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@zorder_tsdl_test
+# col_name             data_type               comment             
+ts                     timestamp                                   
+dd                     double                                      
+ll                     int                                         
+                
+# Partition Information                 
+# col_name             data_type               comment             
+ll                     int                     Transform: bucket[4]
+                
+# Partition Transform Information               
+# col_name             transform_type           
+ll                     BUCKET[4]                
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"dd\":\"true\",\"ll\":\"true\",\"ts\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":2,\"name\":\"dd\",\"required\":false,\"type\":\"double\"},{\"id\":3,\"name\":\"ll\",\"required\":false,\"type\":\"int\"}]}
+       default-partition-spec  
{\"spec-id\":0,\"fields\":[{\"name\":\"ll_bucket\",\"transform\":\"bucket[4]\",\"source-id\":3,\"field-id\":1000}]}
+       format-version          2                   
+       iceberg.orc.files.only  true                
+#### A masked pattern was here ####
+       numFiles                0                   
+       numRows                 0                   
+       parquet.compression     zstd                
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          0                   
+       sort.columns            ts,dd               
+       sort.order              ZORDER              
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#                   
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.format.default    orc                 
+       write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: EXPLAIN INSERT INTO default.zorder_tsdl_test VALUES (TIMESTAMP 
'2022-01-01 00:00:00', 0.0, 0)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@zorder_tsdl_test
+POSTHOOK: query: EXPLAIN INSERT INTO default.zorder_tsdl_test VALUES 
(TIMESTAMP '2022-01-01 00:00:00', 0.0, 0)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@zorder_tsdl_test
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 3 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: _dummy_table
+                  Row Limit Per Split: 1
+                  Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  Select Operator
+                    expressions: array(const struct(TIMESTAMP'2022-01-01 
00:00:00',0,0)) (type: array<struct<col1:timestamp,col2:decimal(1,0),col3:int>>)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 1 Data size: 48 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    UDTF Operator
+                      Statistics: Num rows: 1 Data size: 48 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      function name: inline
+                      Select Operator
+                        expressions: col1 (type: timestamp), UDFToDouble(col2) 
(type: double), col3 (type: int)
+                        outputColumnNames: _col0, _col1, _col2
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        Reduce Output Operator
+                          key expressions: iceberg_bucket(_col2, 4) (type: 
int), iceberg_zorder(_col0, _col1) (type: binary)
+                          null sort order: zz
+                          sort order: ++
+                          Map-reduce partition columns: iceberg_bucket(_col2, 
4) (type: int)
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
+                          value expressions: _col0 (type: timestamp), _col1 
(type: double), _col2 (type: int)
+                        Select Operator
+                          expressions: _col0 (type: timestamp), _col1 (type: 
double), _col2 (type: int)
+                          outputColumnNames: ts, dd, ll
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: COMPLETE
+                          Group By Operator
+                            aggregations: min(ts), max(ts), count(1), 
count(ts), compute_bit_vector_hll(ts), min(dd), max(dd), count(dd), 
compute_bit_vector_hll(dd), min(ll), max(ll), count(ll), 
compute_bit_vector_hll(ll)
+                            keys: iceberg_bucket(ll, 4) (type: int)
+                            minReductionHashAggr: 0.4
+                            mode: hash
+                            outputColumnNames: _col0, _col1, _col2, _col3, 
_col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13
+                            Statistics: Num rows: 1 Data size: 572 Basic 
stats: COMPLETE Column stats: COMPLETE
+                            Reduce Output Operator
+                              key expressions: _col0 (type: int)
+                              null sort order: z
+                              sort order: +
+                              Map-reduce partition columns: _col0 (type: int)
+                              Statistics: Num rows: 1 Data size: 572 Basic 
stats: COMPLETE Column stats: COMPLETE
+                              value expressions: _col1 (type: timestamp), 
_col2 (type: timestamp), _col3 (type: bigint), _col4 (type: bigint), _col5 
(type: binary), _col6 (type: double), _col7 (type: double), _col8 (type: 
bigint), _col9 (type: binary), _col10 (type: int), _col11 (type: int), _col12 
(type: bigint), _col13 (type: binary)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: VALUE._col0 (type: timestamp), VALUE._col1 (type: 
double), VALUE._col2 (type: int), KEY.iceberg_bucket(_col2, 4) (type: int), 
KEY.iceberg_zorder(_col0, _col1) (type: binary)
+                outputColumnNames: _col0, _col1, _col2, iceberg_bucket(_col2, 
4), iceberg_zorder(_col0, _col1)
+                File Output Operator
+                  compressed: false
+                  Dp Sort State: PARTITION_SORTED
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  table:
+                      input format: 
org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                      output format: 
org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                      serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                      name: default.zorder_tsdl_test
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), 
count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), 
min(VALUE._col5), max(VALUE._col6), count(VALUE._col7), 
compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), 
count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
+                keys: KEY._col0 (type: int)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13
+                Statistics: Num rows: 1 Data size: 572 Basic stats: COMPLETE 
Column stats: COMPLETE
+                Select Operator
+                  expressions: 'TIMESTAMP' (type: string), _col1 (type: 
timestamp), _col2 (type: timestamp), (_col3 - _col4) (type: bigint), 
COALESCE(ndv_compute_bit_vector(_col5),0) (type: bigint), _col5 (type: binary), 
'DOUBLE' (type: string), _col6 (type: double), _col7 (type: double), (_col3 - 
_col8) (type: bigint), COALESCE(ndv_compute_bit_vector(_col9),0) (type: 
bigint), _col9 (type: binary), 'LONG' (type: string), UDFToLong(_col10) (type: 
bigint), UDFToLong(_col11) (type: bigint), [...]
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, 
_col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, 
_col16, _col17, _col18
+                  Statistics: Num rows: 1 Data size: 907 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 1 Data size: 907 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: false
+          table:
+              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+              name: default.zorder_tsdl_test
+
+  Stage: Stage-3
+    Stats Work
+      Basic Stats Work:
+      Column Stats Desc:
+          Columns: ts, dd, ll
+          Column Types: timestamp, double, int
+          Table: default.zorder_tsdl_test
+
+PREHOOK: query: INSERT INTO default.zorder_tsdl_test VALUES
+  (TIMESTAMP '2022-01-01 00:00:00', 0.0, 0),
+  (TIMESTAMP '2030-12-31 23:59:59', 9999.99, 1),
+  (TIMESTAMP '2026-06-15 12:00:00', 5000.5, 2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@zorder_tsdl_test
+POSTHOOK: query: INSERT INTO default.zorder_tsdl_test VALUES
+  (TIMESTAMP '2022-01-01 00:00:00', 0.0, 0),
+  (TIMESTAMP '2030-12-31 23:59:59', 9999.99, 1),
+  (TIMESTAMP '2026-06-15 12:00:00', 5000.5, 2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@zorder_tsdl_test
+PREHOOK: query: SELECT * FROM default.zorder_tsdl_test
+PREHOOK: type: QUERY
+PREHOOK: Input: default@zorder_tsdl_test
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM default.zorder_tsdl_test
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@zorder_tsdl_test
+#### A masked pattern was here ####
+2022-01-01 00:00:00    0.0     0
+2026-06-15 12:00:00    5000.5  2
+2030-12-31 23:59:59    9999.99 1
+PREHOOK: query: DROP TABLE default.zorder_tsdl_test
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@zorder_tsdl_test
+PREHOOK: Output: database:default
+PREHOOK: Output: default@zorder_tsdl_test
+POSTHOOK: query: DROP TABLE default.zorder_tsdl_test
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@zorder_tsdl_test
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@zorder_tsdl_test
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 2c038d17b83..5d3c2dccf5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -640,7 +640,7 @@ protected void initializeOp(Configuration hconf) throws 
HiveException {
       jc = new JobConf(hconf);
       setWriteOperation(jc, getConf().getTableInfo().getTableName(), 
getConf().getWriteOperation());
       setWriteOperationIsSorted(jc, getConf().getTableInfo().getTableName(),
-              dpCtx != null && dpCtx.hasCustomSortExpression());
+              dpCtx != null && dpCtx.hasCustomPartitionOrSortExpression());
       setMergeTaskEnabled(jc, getConf().getTableInfo().getTableName(),
           Boolean.parseBoolean((String) 
getConf().getTableInfo().getProperties().get(
               MERGE_TASK_ENABLED + getConf().getTableInfo().getTableName())));
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 60be9c9466d..a057f4137e3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -31,6 +31,7 @@
 
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.conf.Constants;
@@ -202,13 +203,15 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
       }
 
       List<Integer> partitionPositions = getPartitionPositions(dpCtx, 
fsParent.getSchema());
+      LinkedList<Function<List<ExprNodeDesc>, ExprNodeDesc>> 
customPartitionExprs =
+          new LinkedList<>(dpCtx.getCustomPartitionExpressions());
       LinkedList<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs =
           new LinkedList<>(dpCtx.getCustomSortExpressions());
       LinkedList<Integer> customSortOrder = new 
LinkedList<>(dpCtx.getCustomSortOrder());
       LinkedList<Integer> customNullOrder = new 
LinkedList<>(dpCtx.getCustomSortNullOrder());
 
-      // If custom sort expressions are present, there is an explicit 
requirement to do sorting
-      if (customSortExprs.isEmpty() && !shouldDo(partitionPositions, 
fsParent)) {
+      // If custom expressions (partition or sort) are present, there is an 
explicit requirement to do sorting
+      if (customPartitionExprs.isEmpty() && customSortExprs.isEmpty() && 
!shouldDo(partitionPositions, fsParent)) {
         return null;
       }
       // if RS is inserted by enforce bucketing or sorting, we need to remove 
it
@@ -308,8 +311,8 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
 
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, 
sortPositions, sortOrder,
-          sortNullOrder, customSortExprs, customSortOrder, customNullOrder, 
allRSCols, bucketColumns, numBuckets,
-          fsParent, fsOp.getConf().getWriteType());
+          sortNullOrder, customPartitionExprs, customSortExprs, 
customSortOrder, customNullOrder,
+          allRSCols, bucketColumns, numBuckets, fsParent, 
fsOp.getConf().getWriteType());
       // we have to make sure not to reorder the child operators as it might 
cause weird behavior in the tasks at
       // the same level. when there is auto stats gather at the same level as 
another operation then it might
       // cause unnecessary preemption. Maintaining the order here to avoid 
such preemption and possible errors
@@ -349,19 +352,18 @@ public Object process(Node nd, Stack<Node> stack, 
NodeProcessorCtx procCtx,
         customSortExprs.add(BUCKET_SORT_EXPRESSION);
       }
 
-      for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : 
customSortExprs) {
-        ExprNodeDesc colExpr = customSortExpr.apply(allRSCols);
-        String customSortColName = colExpr.getExprString();
-        TypeInfo customSortColTypeInfo = colExpr.getTypeInfo();
-
-        descs.add(new ExprNodeColumnDesc(customSortColTypeInfo, 
ReduceField.KEY + "." + customSortColName,
-            null, false));
-        colNames.add(customSortColName);
-        ColumnInfo ci = new ColumnInfo(
-            customSortColName, customSortColTypeInfo, 
selRS.getSignature().get(0).getTabAlias(), true, true);
-        selRS.getSignature().add(ci);
-        rsOp.getSchema().getSignature().add(ci);
-      }
+      Stream.concat(customPartitionExprs.stream(), customSortExprs.stream())
+          .forEach(customExpr -> {
+            ExprNodeDesc colExpr = customExpr.apply(allRSCols);
+            String columnName = colExpr.getExprString();
+            TypeInfo colTypeInfo = colExpr.getTypeInfo();
+            descs.add(new ExprNodeColumnDesc(colTypeInfo, ReduceField.KEY + 
"." + columnName, null, false));
+            colNames.add(columnName);
+            ColumnInfo ci = new ColumnInfo(
+                    columnName, colTypeInfo, 
selRS.getSignature().get(0).getTabAlias(), true, true);
+            selRS.getSignature().add(ci);
+            rsOp.getSchema().getSignature().add(ci);
+          });
 
       // Create SelectDesc
       SelectDesc selConf = new SelectDesc(descs, colNames);
@@ -410,29 +412,30 @@ private boolean allStaticPartitions(Operator<? extends 
OperatorDesc> op, List<Ex
         return false;
       }
 
-      List<String> referencedSortColumnNames = new LinkedList<>();
-      List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = 
dynPartCtx.getCustomSortExpressions();
+      List<String> referencedPartitionColumnNames = new LinkedList<>();
+      List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customPartitionExprs =
+          dynPartCtx.getCustomPartitionExpressions();
 
-      if (customSortExprs != null && !customSortExprs.isEmpty()) {
+      if (!customPartitionExprs.isEmpty()) {
         Set<ExprNodeColumnDesc> columnDescs = new HashSet<>();
 
-        // Find relevant column descs (e.g. _col0, _col2) for each sort 
expression
-        for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : 
customSortExprs) {
-          ExprNodeDesc sortExpressionForRSSchema = 
customSortExpr.apply(allRSCols);
-          
columnDescs.addAll(ExprNodeDescUtils.findAllColumnDescs(sortExpressionForRSSchema));
+        // Find relevant column descs (e.g. _col0, _col2) for each partition 
expression
+        for (Function<List<ExprNodeDesc>, ExprNodeDesc> customPartitionExpr : 
customPartitionExprs) {
+          ExprNodeDesc partExpressionForRSSchema = 
customPartitionExpr.apply(allRSCols);
+          
columnDescs.addAll(ExprNodeDescUtils.findAllColumnDescs(partExpressionForRSSchema));
         }
 
         for (ExprNodeColumnDesc columnDesc : columnDescs) {
-          referencedSortColumnNames.add(columnDesc.getColumn());
+          referencedPartitionColumnNames.add(columnDesc.getColumn());
         }
 
       } else {
         int numDpCols = dynPartCtx.getNumDPCols();
         int numCols = op.getSchema().getColumnNames().size();
-        
referencedSortColumnNames.addAll(op.getSchema().getColumnNames().subList(numCols
 - numDpCols, numCols));
+        
referencedPartitionColumnNames.addAll(op.getSchema().getColumnNames().subList(numCols
 - numDpCols, numCols));
       }
 
-      for(String dpCol : referencedSortColumnNames) {
+      for (String dpCol : referencedPartitionColumnNames) {
         ExprNodeDesc end = ExprNodeDescUtils.findConstantExprOrigin(dpCol, op);
         if (!(end instanceof ExprNodeConstantDesc)) {
           // There is at least 1 column with no constant mapping -> we will 
need to do the sorting
@@ -440,11 +443,17 @@ private boolean allStaticPartitions(Operator<? extends 
OperatorDesc> op, List<Ex
         }
       }
 
+      // Custom sort expressions require SDPO for sorting
+      if (!dynPartCtx.getCustomSortExpressions().isEmpty()) {
+        return false;
+      }
+
+      LOG.debug("SDPO: all dynamic partition columns constant folded: {}", 
referencedPartitionColumnNames);
       // All columns had constant mappings
       return true;
     }
 
-    // Remove RS and SEL introduced by enforce bucketing/sorting config
+    // Remove RS and SEL introduced by enforce bucketing/sorting conf
     // Convert PARENT -> RS -> SEL -> FS to PARENT -> FS
     private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) {
 
@@ -580,25 +589,28 @@ private void inferSortPositions(Operator<? extends 
OperatorDesc> fsParent,
 
     public ReduceSinkOperator getReduceSinkOp(List<Integer> 
partitionPositions, List<Integer> sortPositions,
         List<Integer> sortOrder, List<Integer> sortNullOrder,
+        List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customPartitionExprs,
         List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs,
         List<Integer> customSortOrder, List<Integer> customSortNullOrder,
         ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns,
         int numBuckets, Operator<? extends OperatorDesc> parent, 
AcidUtils.Operation writeType) {
 
-      // Order of KEY columns, if custom sort is present partition and bucket 
columns are disregarded:
-      // 0) Custom sort expressions
-      //                              1) Partition columns
-      //                              2) Bucket number column
-      //                 3) Sort columns
+      // Order of KEY columns, if custom expressions are present:
+      // 0) Custom partition expressions (for distribution AND sorting)
+      // 1) Custom sort expressions (for sorting ONLY)
+      // 2) Partition columns
+      // 3) Bucket number column
+      // 4) Sort columns
 
+      boolean customPartitionExprPresent = customPartitionExprs != null && 
!customPartitionExprs.isEmpty();
       boolean customSortExprPresent = customSortExprs != null && 
!customSortExprs.isEmpty();
+      boolean customExprPresent = customPartitionExprPresent || 
customSortExprPresent;
 
       Set<Integer> keyColsPosInVal = Sets.newLinkedHashSet();
       ArrayList<ExprNodeDesc> keyCols = Lists.newArrayList();
       List<Integer> newSortOrder = Lists.newArrayList();
-      List<Integer> newSortNullOrder = Lists.newArrayList();
 
-      if (customSortExprPresent) {
+      if (customPartitionExprPresent) {
         partitionPositions = new ArrayList<>();
         bucketColumns = new ArrayList<>();
         numBuckets = -1;
@@ -618,15 +630,22 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer> 
partitionPositions, List
         }
       }
 
-      for (Integer ignored : keyColsPosInVal) {
-        newSortOrder.add(order);
-      }
-
-      if (customSortExprPresent) {
-        for (int i = 0; i < customSortExprs.size() - customSortOrder.size(); 
i++) {
+      if (customExprPresent) {
+        int numPartitionExprs = customPartitionExprs != null ? 
customPartitionExprs.size() : 0;
+        for (int i = 0; i < numPartitionExprs; i++) {
           newSortOrder.add(order);
         }
-        newSortOrder.addAll(customSortOrder);
+
+        int numSortExprs = customSortExprs != null ? customSortExprs.size() : 
0;
+        for (int i = 0; i < numSortExprs; i++) {
+          newSortOrder.add(customSortOrder != null && i < 
customSortOrder.size() ?
+                  customSortOrder.get(i) :
+                  order);
+        }
+      }
+
+      for (Integer ignored : keyColsPosInVal) {
+        newSortOrder.add(order);
       }
 
       String orderStr = "";
@@ -643,7 +662,10 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer> 
partitionPositions, List
         nullOrder = NullOrdering.fromCode(sortNullOrder.get(0)).getSign();
       }
 
-      StringBuilder nullOrderStr = new 
StringBuilder(StringUtils.repeat(nullOrder, keyColsPosInVal.size()));
+      StringBuilder nullOrderStr = new StringBuilder();
+      if (customPartitionExprPresent) {
+        nullOrderStr.append(StringUtils.repeat(nullOrder, 
customPartitionExprs.size()));
+      }
       if (customSortExprPresent) {
         for (int i = 0; i < customSortExprs.size() - 
customSortNullOrder.size(); i++) {
           nullOrderStr.append(nullOrder);
@@ -652,17 +674,28 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer> 
partitionPositions, List
           
nullOrderStr.append(NullOrdering.fromCode(customSortNullOrder.get(i)).getSign());
         }
       }
+      nullOrderStr.append(StringUtils.repeat(nullOrder, 
keyColsPosInVal.size()));
 
       Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
       ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
 
-      for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : 
customSortExprs) {
-        ExprNodeDesc colExpr = customSortExpr.apply(allCols);
-        // Custom sort expressions are marked as KEYs, which is required for 
sorting the rows that are going for
-        // a particular reducer instance. They also need to be marked as 
'partition' columns for MapReduce shuffle
-        // phase, in order to gather the same keys to the same reducer 
instances.
-        keyCols.add(colExpr);
-        partCols.add(colExpr);
+      // Process custom partition expressions (e.g., Iceberg partition 
transforms).
+      // These are used for BOTH distribution and sorting.
+      if (customPartitionExprs != null) {
+        for (Function<List<ExprNodeDesc>, ExprNodeDesc> partExpr : 
customPartitionExprs) {
+          ExprNodeDesc colExpr = partExpr.apply(allCols);
+          keyCols.add(colExpr);    // Add to sort keys
+          partCols.add(colExpr);   // Add to distribution keys
+        }
+      }
+
+      // Process custom sort expressions (e.g., Z-order).
+      // These are used ONLY for sorting, NOT for distribution.
+      if (customSortExprs != null) {
+        for (Function<List<ExprNodeDesc>, ExprNodeDesc> sortExpr : 
customSortExprs) {
+          ExprNodeDesc colExpr = sortExpr.apply(allCols);
+          keyCols.add(colExpr);    // Add to sort keys only
+        }
       }
 
       // we will clone here as RS will update bucket column key with its
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 1eda24ec3b6..61c519aa62f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -19,6 +19,7 @@
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -53,7 +54,8 @@ public class DynamicPartitionCtx implements Serializable {
   private String defaultPartName; // default partition name in case of null or 
empty value
   private int maxPartsPerNode;    // maximum dynamic partitions created per 
mapper/reducer
   private Pattern whiteListPattern;
-  private boolean hasCustomSortExpr = false;
+  private boolean hasCustomPartitionOrSortExpr = false;
+  private transient List<Function<List<ExprNodeDesc>, ExprNodeDesc>> 
customPartitionExpressions;
   /**
    * Expressions describing a custom way of sorting the table before write. 
Expressions can reference simple
    * column descriptions or a tree of expressions containing more columns and 
UDFs.
@@ -131,6 +133,7 @@ public DynamicPartitionCtx(Map<String, String> partSpec, 
String defaultPartName,
       throw new SemanticException(e);
     }
     this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : 
Pattern.compile(confVal);
+    this.customPartitionExpressions = new LinkedList<>();
     this.customSortExpressions = new LinkedList<>();
     this.customSortOrder = new LinkedList<>();
     this.customSortNullOrder = new LinkedList<>();
@@ -148,6 +151,8 @@ public DynamicPartitionCtx(DynamicPartitionCtx dp) {
     this.defaultPartName = dp.defaultPartName;
     this.maxPartsPerNode = dp.maxPartsPerNode;
     this.whiteListPattern = dp.whiteListPattern;
+    this.customPartitionExpressions = new LinkedList<>();
+    addCustomPartitionExpressions(dp.customPartitionExpressions);
     this.customSortExpressions = new LinkedList<>();
     addCustomSortExpressions(dp.customSortExpressions);
     this.customSortOrder = dp.customSortOrder;
@@ -238,13 +243,30 @@ public String getSPPath() {
     return this.spPath;
   }
 
+  public List<Function<List<ExprNodeDesc>, ExprNodeDesc>> 
getCustomPartitionExpressions() {
+    return customPartitionExpressions == null
+        ? Collections.emptyList()
+        : customPartitionExpressions;
+  }
+
+  public void addCustomPartitionExpressions(
+          List<Function<List<ExprNodeDesc>, ExprNodeDesc>> 
customPartitionExpressions) {
+    if 
(!org.apache.commons.collections.CollectionUtils.isEmpty(customPartitionExpressions))
 {
+      this.hasCustomPartitionOrSortExpr = true;
+      this.customPartitionExpressions.addAll(customPartitionExpressions);
+    }
+  }
+
   public List<Function<List<ExprNodeDesc>, ExprNodeDesc>> 
getCustomSortExpressions() {
-    return customSortExpressions;
+    return customSortExpressions == null
+        ? Collections.emptyList()
+        : customSortExpressions;
   }
 
-  public void addCustomSortExpressions(List<Function<List<ExprNodeDesc>, 
ExprNodeDesc>> customSortExpressions) {
+  public void addCustomSortExpressions(
+          List<Function<List<ExprNodeDesc>, ExprNodeDesc>> 
customSortExpressions) {
     if (!CollectionUtils.isEmpty(customSortExpressions)) {
-      this.hasCustomSortExpr = true;
+      this.hasCustomPartitionOrSortExpr = true;
       this.customSortExpressions.addAll(customSortExpressions);
     }
   }
@@ -265,7 +287,7 @@ public void setCustomSortNullOrder(List<Integer> 
customSortNullOrder) {
     this.customSortNullOrder = customSortNullOrder;
   }
 
-  public boolean hasCustomSortExpression() {
-    return hasCustomSortExpr;
+  public boolean hasCustomPartitionOrSortExpression() {
+    return hasCustomPartitionOrSortExpr;
   }
 }


Reply via email to