This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bda6d901 Extract only this Spark's partition's PartitionedFiles in 
planner.rs, and always execute partition 0. (#2675)
8bda6d901 is described below

commit 8bda6d901f293c464afc9b3fbc67825d0df51d0f
Author: Matt Butrovich <[email protected]>
AuthorDate: Sat Nov 1 09:48:18 2025 -0400

    Extract only this Spark's partition's PartitionedFiles in planner.rs, and 
always execute partition 0. (#2675)
---
 native/core/src/execution/jni_api.rs                     |  4 +++-
 native/core/src/execution/planner.rs                     | 16 +++++-----------
 .../test/scala/org/apache/comet/CometFuzzTestSuite.scala |  4 ----
 .../scala/org/apache/comet/exec/CometJoinSuite.scala     |  8 --------
 4 files changed, 8 insertions(+), 24 deletions(-)

diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index 8d76d2c0e..ce991d014 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -485,12 +485,14 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_executePlan(
                 }
 
                 let task_ctx = exec_context.session_ctx.task_ctx();
+                // Each Comet native execution corresponds to a single Spark 
partition,
+                // so we should always execute partition 0.
                 let stream = exec_context
                     .root_op
                     .as_ref()
                     .unwrap()
                     .native_plan
-                    .execute(partition as usize, task_ctx)?;
+                    .execute(0, task_ctx)?;
                 exec_context.stream = Some(stream);
             } else {
                 // Pull input batches
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index c4ec83a6a..a37f928e9 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -1317,17 +1317,11 @@ impl PhysicalPlanner {
                     &object_store_options,
                 )?;
 
-                // Generate file groups
-                let mut file_groups: Vec<Vec<PartitionedFile>> =
-                    Vec::with_capacity(partition_count);
-                scan.file_partitions.iter().try_for_each(|partition| {
-                    let files = self.get_partitioned_files(partition)?;
-                    file_groups.push(files);
-                    Ok::<(), ExecutionError>(())
-                })?;
-
-                // TODO: I think we can remove partition_count in the future, 
but leave for testing.
-                assert_eq!(file_groups.len(), partition_count);
+                // Comet serializes all partitions' PartitionedFiles, but we 
only want to read this
+                // Spark partition's PartitionedFiles
+                let files =
+                    
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
+                let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
                 let partition_fields: Vec<Field> = partition_schema
                     .fields()
                     .iter()
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
index 8043b81b3..006112d2b 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
@@ -200,10 +200,6 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
   }
 
   test("join") {
-    // TODO enable native_datafusion tests
-    // https://github.com/apache/datafusion-comet/issues/2660
-    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
-
     val df = spark.read.parquet(filename)
     df.createOrReplaceTempView("t1")
     df.createOrReplaceTempView("t2")
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index 010757d3a..d47b4e0c1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -74,10 +74,6 @@ class CometJoinSuite extends CometTestBase {
   }
 
   test("Broadcast HashJoin without join filter") {
-    // TODO enable native_datafusion tests
-    // https://github.com/apache/datafusion-comet/issues/2660
-    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
-
     withSQLConf(
       CometConf.COMET_BATCH_SIZE.key -> "100",
       SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
@@ -105,10 +101,6 @@ class CometJoinSuite extends CometTestBase {
   }
 
   test("Broadcast HashJoin with join filter") {
-    // TODO enable native_datafusion tests
-    // https://github.com/apache/datafusion-comet/issues/2660
-    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
-
     withSQLConf(
       CometConf.COMET_BATCH_SIZE.key -> "100",
       SQLConf.PREFER_SORTMERGEJOIN.key -> "false",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to