This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 8bda6d901 Extract only this Spark's partition's PartitionedFiles in
planner.rs, and always execute partition 0. (#2675)
8bda6d901 is described below
commit 8bda6d901f293c464afc9b3fbc67825d0df51d0f
Author: Matt Butrovich <[email protected]>
AuthorDate: Sat Nov 1 09:48:18 2025 -0400
Extract only this Spark's partition's PartitionedFiles in planner.rs, and
always execute partition 0. (#2675)
---
native/core/src/execution/jni_api.rs | 4 +++-
native/core/src/execution/planner.rs | 16 +++++-----------
.../test/scala/org/apache/comet/CometFuzzTestSuite.scala | 4 ----
.../scala/org/apache/comet/exec/CometJoinSuite.scala | 8 --------
4 files changed, 8 insertions(+), 24 deletions(-)
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index 8d76d2c0e..ce991d014 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -485,12 +485,14 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_executePlan(
}
let task_ctx = exec_context.session_ctx.task_ctx();
+ // Each Comet native execution corresponds to a single Spark
partition,
+ // so we should always execute partition 0.
let stream = exec_context
.root_op
.as_ref()
.unwrap()
.native_plan
- .execute(partition as usize, task_ctx)?;
+ .execute(0, task_ctx)?;
exec_context.stream = Some(stream);
} else {
// Pull input batches
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index c4ec83a6a..a37f928e9 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -1317,17 +1317,11 @@ impl PhysicalPlanner {
&object_store_options,
)?;
- // Generate file groups
- let mut file_groups: Vec<Vec<PartitionedFile>> =
- Vec::with_capacity(partition_count);
- scan.file_partitions.iter().try_for_each(|partition| {
- let files = self.get_partitioned_files(partition)?;
- file_groups.push(files);
- Ok::<(), ExecutionError>(())
- })?;
-
- // TODO: I think we can remove partition_count in the future,
but leave for testing.
- assert_eq!(file_groups.len(), partition_count);
+ // Comet serializes all partitions' PartitionedFiles, but we
only want to read this
+ // Spark partition's PartitionedFiles
+ let files =
+
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
+ let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
let partition_fields: Vec<Field> = partition_schema
.fields()
.iter()
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
index 8043b81b3..006112d2b 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
@@ -200,10 +200,6 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
}
test("join") {
- // TODO enable native_datafusion tests
- // https://github.com/apache/datafusion-comet/issues/2660
- assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
-
val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
df.createOrReplaceTempView("t2")
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index 010757d3a..d47b4e0c1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -74,10 +74,6 @@ class CometJoinSuite extends CometTestBase {
}
test("Broadcast HashJoin without join filter") {
- // TODO enable native_datafusion tests
- // https://github.com/apache/datafusion-comet/issues/2660
- assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
-
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> "100",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
@@ -105,10 +101,6 @@ class CometJoinSuite extends CometTestBase {
}
test("Broadcast HashJoin with join filter") {
- // TODO enable native_datafusion tests
- // https://github.com/apache/datafusion-comet/issues/2660
- assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() !=
CometConf.SCAN_NATIVE_DATAFUSION)
-
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> "100",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]