This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7ea6713c feat(datafusion): implement the partitioning node for
DataFusion to define the partitioning (#1620)
7ea6713c is described below
commit 7ea6713c738db2b9738778a6bed92b97d3a55257
Author: Florian Valeye <[email protected]>
AuthorDate: Fri Oct 31 10:33:17 2025 +0100
feat(datafusion): implement the partitioning node for DataFusion to define
the partitioning (#1620)
## Which issue does this PR close?
- Closes #1543
## What changes are included in this PR?
Implement a physical execution repartition node that determines the
relevant DataFusion partitioning strategy based on the Iceberg table
schema and metadata.
1. Unpartitioned tables: Uses round-robin partitioning
2. Partitioned tables: It depends on the transform type:
- Identity or Bucket transforms: Uses hash partitioning on the
_partition column
- Temporal transforms (Year, Month, Day, Hour): Uses round-robin
partitioning
_Minor change: I created a new `schema_ref()` helper method._
## Are these changes tested?
Yes, with unit tests
---------
Signed-off-by: Florian Valeye <[email protected]>
---
crates/iceberg/src/table.rs | 7 +-
.../datafusion/src/physical_plan/mod.rs | 1 +
.../datafusion/src/physical_plan/repartition.rs | 886 +++++++++++++++++++++
3 files changed, 893 insertions(+), 1 deletion(-)
diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs
index d4e696ce..80e10b2f 100644
--- a/crates/iceberg/src/table.rs
+++ b/crates/iceberg/src/table.rs
@@ -24,7 +24,7 @@ use crate::inspect::MetadataTable;
use crate::io::FileIO;
use crate::io::object_cache::ObjectCache;
use crate::scan::TableScanBuilder;
-use crate::spec::{TableMetadata, TableMetadataRef};
+use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef};
use crate::{Error, ErrorKind, Result, TableIdent};
/// Builder to create table scan.
@@ -235,6 +235,11 @@ impl Table {
self.readonly
}
+ /// Returns the current schema as a shared reference.
+ pub fn current_schema_ref(&self) -> SchemaRef {
+ self.metadata.current_schema().clone()
+ }
+
/// Create a reader for the table.
pub fn reader_builder(&self) -> ArrowReaderBuilder {
ArrowReaderBuilder::new(self.file_io.clone())
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs
b/crates/integrations/datafusion/src/physical_plan/mod.rs
index ce923b86..eb58082f 100644
--- a/crates/integrations/datafusion/src/physical_plan/mod.rs
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -19,6 +19,7 @@ pub(crate) mod commit;
pub(crate) mod expr_to_predicate;
pub(crate) mod metadata_scan;
pub(crate) mod project;
+pub(crate) mod repartition;
pub(crate) mod scan;
pub(crate) mod write;
diff --git a/crates/integrations/datafusion/src/physical_plan/repartition.rs
b/crates/integrations/datafusion/src/physical_plan/repartition.rs
new file mode 100644
index 00000000..95cdc847
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/repartition.rs
@@ -0,0 +1,886 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::num::NonZeroUsize;
+use std::sync::Arc;
+
+use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::expressions::Column;
+use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN;
+use iceberg::spec::{TableMetadata, TableMetadataRef, Transform};
+/// Creates an Iceberg-aware repartition execution plan that optimizes data
distribution
+/// for parallel processing while respecting Iceberg table partitioning
semantics.
+///
+/// Automatically determines the optimal partitioning strategy based on the
table's
+/// partition specification.
+///
+/// ## Partitioning Strategies
+///
+/// - **Partitioned tables with Identity/Bucket transforms** – Uses hash
partitioning on the
+/// `_partition` column for optimal data distribution and file clustering.
Ensures that rows
+/// with the same partition values are co-located in the same task.
+///
+/// - **Partitioned tables with temporal transforms** – Uses round-robin
partitioning for
+/// temporal transforms (Year, Month, Day, Hour) that don't provide uniform
hash distribution.
+///
+/// - **Unpartitioned tables** – Uses round-robin distribution to balance load
evenly across workers.
+///
+/// ## Requirements
+///
+/// - **For partitioned tables**: The input MUST include the `_partition`
column.
+/// Add it by calling
[`project_with_partition`](crate::physical_plan::project_with_partition) before
[`repartition`].
+/// - **For unpartitioned tables**: No special preparation needed.
+/// - Returns an error if a partitioned table is missing the `_partition`
column.
+///
+/// ## Performance Notes
+///
+/// - Only adds repartitioning when the input partitioning differs from the
target.
+/// - Requires an explicit target partition count for deterministic behavior.
+///
+/// # Arguments
+///
+/// * `input` - The input [`ExecutionPlan`]. For partitioned tables, must
include the `_partition`
+/// column (added via
[`project_with_partition`](crate::physical_plan::project_with_partition)).
+/// * `table_metadata` - Iceberg table metadata containing partition spec.
+/// * `target_partitions` - Target number of partitions for parallel
processing (must be > 0).
+///
+/// # Returns
+///
+/// An [`ExecutionPlan`] that applies the optimal partitioning strategy, or
the original input plan
+/// if repartitioning is not needed.
+///
+/// # Errors
+///
+/// Returns [`DataFusionError::Plan`] if a partitioned table input is missing
the `_partition` column.
+///
+/// # Examples
+///
+/// For partitioned tables, first add the `_partition` column:
+///
+/// ```ignore
+/// use std::num::NonZeroUsize;
+/// use iceberg_datafusion::physical_plan::project_with_partition;
+///
+/// let plan_with_partition = project_with_partition(input_plan, &table)?;
+///
+/// let repartitioned_plan = repartition(
+/// plan_with_partition,
+/// table.metadata_ref(),
+/// NonZeroUsize::new(4).unwrap(),
+/// )?;
+/// ```
+#[allow(dead_code)]
+pub(crate) fn repartition(
+ input: Arc<dyn ExecutionPlan>,
+ table_metadata: TableMetadataRef,
+ target_partitions: NonZeroUsize,
+) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let partitioning_strategy =
+ determine_partitioning_strategy(&input, &table_metadata,
target_partitions)?;
+
+ Ok(Arc::new(RepartitionExec::try_new(
+ input,
+ partitioning_strategy,
+ )?))
+}
+
+/// Determine the optimal partitioning strategy based on table metadata.
+///
+/// Analyzes the table's partition specification to select the most appropriate
+/// DataFusion partitioning strategy for insert operations.
+///
+/// ## Partitioning Strategy
+///
+/// - **Partitioned tables**: Must have the `_partition` column in the input
schema (added via
+/// `project_with_partition`). Uses hash partitioning if the partition spec
contains Identity
+/// or Bucket transforms for good data distribution. Falls back to
round-robin for temporal
+/// transforms (Year, Month, Day, Hour) that don't provide uniform hash
distribution.
+///
+/// - **Unpartitioned tables**: Always uses round-robin batch partitioning to
ensure even load
+/// distribution across workers.
+///
+/// ## Requirements
+///
+/// - **For partitioned tables**: The input MUST include the `_partition`
column
+/// (added via `project_with_partition()`).
+/// - **For unpartitioned tables**: No special preparation needed.
+/// - Returns an error if a partitioned table is missing the `_partition`
column.
+fn determine_partitioning_strategy(
+ input: &Arc<dyn ExecutionPlan>,
+ table_metadata: &TableMetadata,
+ target_partitions: NonZeroUsize,
+) -> DFResult<Partitioning> {
+ let partition_spec = table_metadata.default_partition_spec();
+ let input_schema = input.schema();
+ let target_partition_count = target_partitions.get();
+
+ // Check if partition spec has transforms suitable for hash partitioning
+ let has_hash_friendly_transforms = partition_spec
+ .fields()
+ .iter()
+ .any(|pf| matches!(pf.transform, Transform::Identity |
Transform::Bucket(_)));
+
+ let partition_col_result =
input_schema.index_of(PROJECTED_PARTITION_VALUE_COLUMN);
+ let is_partitioned_table = !partition_spec.is_unpartitioned();
+
+ match (is_partitioned_table, partition_col_result) {
+ // Case 1: Partitioned table with _partition column present
+ (true, Ok(partition_col_idx)) => {
+ let partition_expr = Arc::new(Column::new(
+ PROJECTED_PARTITION_VALUE_COLUMN,
+ partition_col_idx,
+ )) as Arc<dyn PhysicalExpr>;
+
+ if has_hash_friendly_transforms {
+ Ok(Partitioning::Hash(
+ vec![partition_expr],
+ target_partition_count,
+ ))
+ } else {
+ Ok(Partitioning::RoundRobinBatch(target_partition_count))
+ }
+ }
+
+ // Case 2: Partitioned table missing _partition column (normally this
should not happen)
+ (true, Err(_)) => Err(DataFusionError::Plan(format!(
+ "Partitioned table input missing {} column. \
+ Ensure projection happens before repartitioning.",
+ PROJECTED_PARTITION_VALUE_COLUMN
+ ))),
+
+ // Case 3: Unpartitioned table, always use RoundRobinBatch
+ (false, _) =>
Ok(Partitioning::RoundRobinBatch(target_partition_count)),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use datafusion::arrow::datatypes::{
+ DataType as ArrowDataType, Field as ArrowField, Fields, Schema as
ArrowSchema, TimeUnit,
+ };
+ use datafusion::execution::TaskContext;
+ use datafusion::physical_plan::empty::EmptyExec;
+ use iceberg::TableIdent;
+ use iceberg::io::FileIO;
+ use iceberg::spec::{
+ NestedField, NullOrder, PrimitiveType, Schema, SortDirection,
SortField, SortOrder,
+ Transform, Type,
+ };
+ use iceberg::table::Table;
+
+ use super::*;
+
+ fn create_test_table() -> Table {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::required(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::required(
+ 2,
+ "data",
+ Type::Primitive(PrimitiveType::String),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(schema.clone())
+ .build()
+ .unwrap();
+ let sort_order =
iceberg::spec::SortOrder::builder().build(&schema).unwrap();
+ let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
+ schema,
+ partition_spec,
+ sort_order,
+ "/test/table".to_string(),
+ iceberg::spec::FormatVersion::V2,
+ std::collections::HashMap::new(),
+ )
+ .unwrap();
+
+ let table_metadata = table_metadata_builder.build().unwrap();
+
+ Table::builder()
+ .metadata(table_metadata.metadata)
+ .identifier(TableIdent::from_strs(["test", "table"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .metadata_location("/test/metadata.json".to_string())
+ .build()
+ .unwrap()
+ }
+
+ fn create_test_arrow_schema() -> Arc<ArrowSchema> {
+ Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("id", ArrowDataType::Int64, false),
+ ArrowField::new("data", ArrowDataType::Utf8, false),
+ ]))
+ }
+
+ #[tokio::test]
+ async fn test_repartition_unpartitioned_table() {
+ let table = create_test_table();
+ let input = Arc::new(EmptyExec::new(create_test_arrow_schema()));
+
+ let repartitioned_plan = repartition(
+ input.clone(),
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(4).unwrap(),
+ )
+ .unwrap();
+
+ assert_ne!(input.name(), repartitioned_plan.name());
+ assert_eq!(repartitioned_plan.name(), "RepartitionExec");
+ }
+
+ #[tokio::test]
+ async fn test_repartition_explicit_partitions() {
+ let table = create_test_table();
+ let input = Arc::new(EmptyExec::new(create_test_arrow_schema()));
+
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(8).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ match partitioning {
+ Partitioning::RoundRobinBatch(n) => {
+ assert_eq!(*n, 8);
+ }
+ _ => panic!("Expected RoundRobinBatch partitioning"),
+ }
+ }
+
+ #[tokio::test]
+ async fn test_repartition_zero_partitions_fails() {
+ let _table = create_test_table();
+ let _input = Arc::new(EmptyExec::new(create_test_arrow_schema()));
+
+ let result = std::num::NonZeroUsize::new(0);
+ assert!(result.is_none(), "NonZeroUsize::new(0) should return None");
+
+ // Test that we can't call repartition with 0 partitions
+ // This is prevented at compile time by NonZeroUsize
+ let _ = result; // This would be None, so we can't call repartition
+ }
+
+ #[tokio::test]
+ async fn test_partition_count_validation() {
+ let table = create_test_table();
+ let input = Arc::new(EmptyExec::new(create_test_arrow_schema()));
+
+ let target_partitions = 16;
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(target_partitions).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ match partitioning {
+ Partitioning::RoundRobinBatch(n) => {
+ assert_eq!(*n, target_partitions);
+ }
+ _ => panic!("Expected RoundRobinBatch partitioning"),
+ }
+ }
+
+ #[tokio::test]
+ async fn test_datafusion_repartitioning_integration() {
+ let table = create_test_table();
+ let input = Arc::new(EmptyExec::new(create_test_arrow_schema()));
+
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(3).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ match partitioning {
+ Partitioning::RoundRobinBatch(n) => {
+ assert_eq!(*n, 3, "Should use round-robin for unpartitioned
table");
+ }
+ _ => panic!("Expected RoundRobinBatch partitioning for
unpartitioned table"),
+ }
+
+ let task_ctx = Arc::new(TaskContext::default());
+ let stream = repartitioned_plan.execute(0, task_ctx.clone()).unwrap();
+
+ assert!(!stream.schema().fields().is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_bucket_aware_partitioning() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::required(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::required(
+ 2,
+ "category",
+ Type::Primitive(PrimitiveType::String),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let sort_order = SortOrder::builder()
+ .with_order_id(1)
+ .with_sort_field(SortField {
+ source_id: 2,
+ transform: Transform::Bucket(4),
+ direction: SortDirection::Ascending,
+ null_order: NullOrder::First,
+ })
+ .build(&schema)
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(schema.clone())
+ .build()
+ .unwrap();
+ let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
+ schema,
+ partition_spec,
+ sort_order,
+ "/test/bucketed_table".to_string(),
+ iceberg::spec::FormatVersion::V2,
+ std::collections::HashMap::new(),
+ )
+ .unwrap();
+
+ let table_metadata = table_metadata_builder.build().unwrap();
+ let table = Table::builder()
+ .metadata(table_metadata.metadata)
+ .identifier(TableIdent::from_strs(["test",
"bucketed_table"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .metadata_location("/test/bucketed_metadata.json".to_string())
+ .build()
+ .unwrap();
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("id", ArrowDataType::Int64, false),
+ ArrowField::new("category", ArrowDataType::Utf8, false),
+ ]));
+ let input = Arc::new(EmptyExec::new(arrow_schema));
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(4).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ // For bucketed tables without _partition column, should use
round-robin
+ // since the new logic prioritizes _partition column when available
+ match partitioning {
+ Partitioning::Hash(_, _) => {
+ // This would happen if _partition column is present
+ }
+ Partitioning::RoundRobinBatch(_) => {
+ // This happens when _partition column is not present
+ }
+ _ => panic!("Unexpected partitioning strategy"),
+ }
+ }
+
+ #[tokio::test]
+ async fn test_combined_partition_and_bucket_strategy() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::required(
+ 1,
+ "date",
+ Type::Primitive(PrimitiveType::Date),
+ )),
+ Arc::new(NestedField::required(
+ 2,
+ "user_id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::required(
+ 3,
+ "amount",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(schema.clone())
+ .add_partition_field("date", "date", Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let sort_order = SortOrder::builder()
+ .with_order_id(1)
+ .with_sort_field(SortField {
+ source_id: 2,
+ transform: Transform::Bucket(8),
+ direction: SortDirection::Ascending,
+ null_order: NullOrder::First,
+ })
+ .build(&schema)
+ .unwrap();
+
+ let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
+ schema,
+ partition_spec,
+ sort_order,
+ "/test/partitioned_bucketed_table".to_string(),
+ iceberg::spec::FormatVersion::V2,
+ std::collections::HashMap::new(),
+ )
+ .unwrap();
+
+ let table_metadata = table_metadata_builder.build().unwrap();
+ let table = Table::builder()
+ .metadata(table_metadata.metadata)
+ .identifier(TableIdent::from_strs(["test",
"partitioned_bucketed_table"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+
.metadata_location("/test/partitioned_bucketed_metadata.json".to_string())
+ .build()
+ .unwrap();
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("date", ArrowDataType::Date32, false),
+ ArrowField::new("user_id", ArrowDataType::Int64, false),
+ ArrowField::new("amount", ArrowDataType::Int64, false),
+ ArrowField::new(
+ PROJECTED_PARTITION_VALUE_COLUMN,
+ ArrowDataType::Struct(Fields::empty()),
+ false,
+ ),
+ ]));
+ let input = Arc::new(EmptyExec::new(arrow_schema));
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(4).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ match partitioning {
+ Partitioning::Hash(exprs, _) => {
+ // Should use _partition column for hash partitioning
+ assert_eq!(
+ exprs.len(),
+ 1,
+ "Should have exactly one hash column (_partition)"
+ );
+
+ let column_names: Vec<String> = exprs
+ .iter()
+ .filter_map(|expr| {
+ expr.as_any()
+ .downcast_ref::<Column>()
+ .map(|col| col.name().to_string())
+ })
+ .collect();
+
+ assert!(
+
column_names.contains(&PROJECTED_PARTITION_VALUE_COLUMN.to_string()),
+ "Should use _partition column, got: {:?}",
+ column_names
+ );
+ }
+ _ => panic!("Expected Hash partitioning with Identity transform"),
+ }
+ }
+
+ #[tokio::test]
+ async fn test_none_distribution_mode_fallback() {
+ let schema = Schema::builder()
+ .with_fields(vec![Arc::new(NestedField::required(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ ))])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(schema.clone())
+ .build()
+ .unwrap();
+ let sort_order =
iceberg::spec::SortOrder::builder().build(&schema).unwrap();
+
+ let mut properties = std::collections::HashMap::new();
+ properties.insert("write.distribution-mode".to_string(),
"none".to_string());
+
+ let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
+ schema,
+ partition_spec,
+ sort_order,
+ "/test/none_table".to_string(),
+ iceberg::spec::FormatVersion::V2,
+ properties,
+ )
+ .unwrap();
+
+ let table_metadata = table_metadata_builder.build().unwrap();
+ let table = Table::builder()
+ .metadata(table_metadata.metadata)
+ .identifier(TableIdent::from_strs(["test", "none_table"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .metadata_location("/test/none_metadata.json".to_string())
+ .build()
+ .unwrap();
+
+ let input = Arc::new(EmptyExec::new(create_test_arrow_schema()));
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(4).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ assert!(
+ matches!(partitioning, Partitioning::RoundRobinBatch(_)),
+ "Should use round-robin for 'none' distribution mode"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_schema_ref_convenience_method() {
+ let table = create_test_table();
+
+ let schema_ref_1 = table.current_schema_ref();
+ let schema_ref_2 = Arc::clone(table.metadata().current_schema());
+
+ assert!(
+ Arc::ptr_eq(&schema_ref_1, &schema_ref_2),
+ "current_schema_ref() should return the same Arc as manual
approach"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_range_only_partitions_use_round_robin() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::required(
+ 1,
+ "date",
+ Type::Primitive(PrimitiveType::Date),
+ )),
+ Arc::new(NestedField::required(
+ 2,
+ "amount",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(schema.clone())
+ .add_partition_field("date", "date_day", Transform::Day)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let sort_order =
iceberg::spec::SortOrder::builder().build(&schema).unwrap();
+ let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
+ schema,
+ partition_spec,
+ sort_order,
+ "/test/range_only_table".to_string(),
+ iceberg::spec::FormatVersion::V2,
+ std::collections::HashMap::new(),
+ )
+ .unwrap();
+
+ let table_metadata = table_metadata_builder.build().unwrap();
+ let table = Table::builder()
+ .metadata(table_metadata.metadata)
+ .identifier(TableIdent::from_strs(["test",
"range_only_table"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .metadata_location("/test/range_only_metadata.json".to_string())
+ .build()
+ .unwrap();
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("date", ArrowDataType::Date32, false),
+ ArrowField::new("amount", ArrowDataType::Int64, false),
+ ArrowField::new(
+ PROJECTED_PARTITION_VALUE_COLUMN,
+ ArrowDataType::Struct(Fields::empty()),
+ false,
+ ),
+ ]));
+ let input = Arc::new(EmptyExec::new(arrow_schema));
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(4).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ assert!(
+ matches!(partitioning, Partitioning::RoundRobinBatch(_)),
+ "Should use round-robin for temporal transforms (Day) that don't
provide good hash distribution"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_mixed_transforms_use_hash_partitioning() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::required(
+ 1,
+ "date",
+ Type::Primitive(PrimitiveType::Date),
+ )),
+ Arc::new(NestedField::required(
+ 2,
+ "user_id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::required(
+ 3,
+ "amount",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(schema.clone())
+ .add_partition_field("date", "date_day", Transform::Day)
+ .unwrap()
+ .add_partition_field("user_id", "user_id", Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let sort_order =
iceberg::spec::SortOrder::builder().build(&schema).unwrap();
+ let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
+ schema,
+ partition_spec,
+ sort_order,
+ "/test/mixed_transforms_table".to_string(),
+ iceberg::spec::FormatVersion::V2,
+ std::collections::HashMap::new(),
+ )
+ .unwrap();
+
+ let table_metadata = table_metadata_builder.build().unwrap();
+ let table = Table::builder()
+ .metadata(table_metadata.metadata)
+ .identifier(TableIdent::from_strs(["test",
"mixed_transforms_table"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+
.metadata_location("/test/mixed_transforms_metadata.json".to_string())
+ .build()
+ .unwrap();
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("date", ArrowDataType::Date32, false),
+ ArrowField::new("user_id", ArrowDataType::Int64, false),
+ ArrowField::new("amount", ArrowDataType::Int64, false),
+ ArrowField::new(
+ PROJECTED_PARTITION_VALUE_COLUMN,
+ ArrowDataType::Struct(Fields::empty()),
+ false,
+ ),
+ ]));
+ let input = Arc::new(EmptyExec::new(arrow_schema));
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(4).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ match partitioning {
+ Partitioning::Hash(exprs, _) => {
+ assert_eq!(exprs.len(), 1, "Should have one hash column
(_partition)");
+ let column_names: Vec<String> = exprs
+ .iter()
+ .filter_map(|expr| {
+ expr.as_any()
+ .downcast_ref::<Column>()
+ .map(|col| col.name().to_string())
+ })
+ .collect();
+ assert!(
+
column_names.contains(&PROJECTED_PARTITION_VALUE_COLUMN.to_string()),
+ "Should use _partition column for mixed transforms with
Identity, got: {:?}",
+ column_names
+ );
+ }
+ _ => panic!("Expected Hash partitioning for table with identity
transforms"),
+ }
+ }
+
+ #[tokio::test]
+ async fn test_partition_column_with_temporal_transforms_uses_round_robin()
{
+ let schema = Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::required(
+ 1,
+ "event_time",
+ Type::Primitive(PrimitiveType::Timestamp),
+ )),
+ Arc::new(NestedField::required(
+ 2,
+ "amount",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(schema.clone())
+ .add_partition_field("event_time", "event_month", Transform::Month)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let sort_order =
iceberg::spec::SortOrder::builder().build(&schema).unwrap();
+ let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
+ schema,
+ partition_spec,
+ sort_order,
+ "/test/temporal_partition".to_string(),
+ iceberg::spec::FormatVersion::V2,
+ std::collections::HashMap::new(),
+ )
+ .unwrap();
+
+ let table_metadata = table_metadata_builder.build().unwrap();
+ let table = Table::builder()
+ .metadata(table_metadata.metadata)
+ .identifier(TableIdent::from_strs(["test",
"temporal_partition"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .metadata_location("/test/temporal_metadata.json".to_string())
+ .build()
+ .unwrap();
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new(
+ "event_time",
+ ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ ),
+ ArrowField::new("amount", ArrowDataType::Int64, false),
+ ArrowField::new(
+ PROJECTED_PARTITION_VALUE_COLUMN,
+ ArrowDataType::Struct(Fields::empty()),
+ false,
+ ),
+ ]));
+ let input = Arc::new(EmptyExec::new(arrow_schema));
+
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(4).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ assert!(
+ matches!(partitioning, Partitioning::RoundRobinBatch(_)),
+ "Should use round-robin for _partition column with temporal
transforms, not Hash"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_partition_column_with_identity_transforms_uses_hash() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::required(
+ 1,
+ "user_id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::required(
+ 2,
+ "amount",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec =
iceberg::spec::PartitionSpec::builder(schema.clone())
+ .add_partition_field("user_id", "user_id", Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let sort_order =
iceberg::spec::SortOrder::builder().build(&schema).unwrap();
+ let table_metadata_builder = iceberg::spec::TableMetadataBuilder::new(
+ schema,
+ partition_spec,
+ sort_order,
+ "/test/identity_partition".to_string(),
+ iceberg::spec::FormatVersion::V2,
+ std::collections::HashMap::new(),
+ )
+ .unwrap();
+
+ let table_metadata = table_metadata_builder.build().unwrap();
+ let table = Table::builder()
+ .metadata(table_metadata.metadata)
+ .identifier(TableIdent::from_strs(["test",
"identity_partition"]).unwrap())
+ .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
+ .metadata_location("/test/identity_metadata.json".to_string())
+ .build()
+ .unwrap();
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("user_id", ArrowDataType::Int64, false),
+ ArrowField::new("amount", ArrowDataType::Int64, false),
+ ArrowField::new(
+ PROJECTED_PARTITION_VALUE_COLUMN,
+ ArrowDataType::Struct(Fields::empty()),
+ false,
+ ),
+ ]));
+ let input = Arc::new(EmptyExec::new(arrow_schema));
+
+ let repartitioned_plan = repartition(
+ input,
+ table.metadata_ref(),
+ std::num::NonZeroUsize::new(4).unwrap(),
+ )
+ .unwrap();
+
+ let partitioning =
repartitioned_plan.properties().output_partitioning();
+ assert!(
+ matches!(partitioning, Partitioning::Hash(_, _)),
+ "Should use Hash partitioning for _partition column with Identity
transforms"
+ );
+ }
+}