This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new d0aa1ffda perf: Add Comet config for native Iceberg reader's data file
concurrency (#3584)
d0aa1ffda is described below
commit d0aa1ffdaed1e27bb26427c3aba9ca583233df40
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Feb 24 19:44:45 2026 -0500
perf: Add Comet config for native Iceberg reader's data file concurrency
(#3584)
---
common/src/main/scala/org/apache/comet/CometConf.scala | 11 +++++++++++
native/core/src/execution/operators/iceberg_scan.rs | 6 +++++-
native/core/src/execution/planner.rs | 2 ++
native/proto/src/proto/operator.proto | 3 +++
.../apache/comet/serde/operator/CometIcebergNativeScan.scala | 4 +++-
5 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 480eafdcb..5ee777f3d 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -148,6 +148,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] =
+ conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit")
+ .category(CATEGORY_SCAN)
+ .doc(
+ "The number of Iceberg data files to read concurrently within a single
task. " +
+ "Higher values improve throughput for tables with many small files
by overlapping " +
+ "I/O latency, but increase memory usage. Values between 2 and 8 are
suggested.")
+ .intConf
+ .checkValue(v => v > 0, "Data file concurrency limit must be positive")
+ .createWithDefault(1)
+
val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.csv.v2.enabled")
.category(CATEGORY_TESTING)
diff --git a/native/core/src/execution/operators/iceberg_scan.rs
b/native/core/src/execution/operators/iceberg_scan.rs
index bc20592e9..39ce25002 100644
--- a/native/core/src/execution/operators/iceberg_scan.rs
+++ b/native/core/src/execution/operators/iceberg_scan.rs
@@ -61,6 +61,8 @@ pub struct IcebergScanExec {
catalog_properties: HashMap<String, String>,
/// Pre-planned file scan tasks
tasks: Vec<FileScanTask>,
+ /// Number of data files to read concurrently
+ data_file_concurrency_limit: usize,
/// Metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -71,6 +73,7 @@ impl IcebergScanExec {
schema: SchemaRef,
catalog_properties: HashMap<String, String>,
tasks: Vec<FileScanTask>,
+ data_file_concurrency_limit: usize,
) -> Result<Self, ExecutionError> {
let output_schema = schema;
let plan_properties =
Self::compute_properties(Arc::clone(&output_schema), 1);
@@ -83,6 +86,7 @@ impl IcebergScanExec {
plan_properties,
catalog_properties,
tasks,
+ data_file_concurrency_limit,
metrics,
})
}
@@ -158,7 +162,7 @@ impl IcebergScanExec {
let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io)
.with_batch_size(batch_size)
-
.with_data_file_concurrency_limit(context.session_config().target_partitions())
+ .with_data_file_concurrency_limit(self.data_file_concurrency_limit)
.with_row_selection_enabled(true)
.build();
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index f84d6cc59..ef81cdfbf 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -1176,12 +1176,14 @@ impl PhysicalPlanner {
.collect();
let metadata_location = common.metadata_location.clone();
let tasks = parse_file_scan_tasks_from_common(common,
&scan.file_scan_tasks)?;
+ let data_file_concurrency_limit =
common.data_file_concurrency_limit as usize;
let iceberg_scan = IcebergScanExec::new(
metadata_location,
required_schema,
catalog_properties,
tasks,
+ data_file_concurrency_limit,
)?;
Ok((
diff --git a/native/proto/src/proto/operator.proto
b/native/proto/src/proto/operator.proto
index 93872b462..bf2752bdd 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -175,6 +175,9 @@ message IcebergScanCommon {
repeated PartitionData partition_data_pool = 9;
repeated DeleteFileList delete_files_pool = 10;
repeated spark.spark_expression.Expr residual_pool = 11;
+
+ // Number of data files to read concurrently within a single task
+ uint32 data_file_concurrency_limit = 12;
}
message IcebergScan {
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
index c86b2a51b..9f1a01599 100644
---
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
+++
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec,
CometNativeExec}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceRDD, DataSourceRDDPartition}
import org.apache.spark.sql.types._
-import org.apache.comet.ConfigEntry
+import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata,
IcebergReflection}
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.ExprOuterClass.Expr
@@ -757,6 +757,8 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
var totalTasks = 0
commonBuilder.setMetadataLocation(metadata.metadataLocation)
+ commonBuilder.setDataFileConcurrencyLimit(
+ CometConf.COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT.get())
metadata.catalogProperties.foreach { case (key, value) =>
commonBuilder.putCatalogProperties(key, value)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]