This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c307b7dc chore: remove CopyExec (#2663)
2c307b7dc is described below

commit 2c307b7dc930051560189dc9ba0e1356519f5fe4
Author: Andy Grove <[email protected]>
AuthorDate: Wed Oct 29 18:43:47 2025 -0600

    chore: remove CopyExec (#2663)
---
 native/core/src/execution/operators/copy.rs        | 220 +--------------------
 native/core/src/execution/planner.rs               |  40 ++--
 native/core/src/execution/spark_plan.rs            |  19 +-
 .../org/apache/comet/CometFuzzTestSuite.scala      |   4 +
 .../org/apache/comet/exec/CometJoinSuite.scala     |   8 +
 5 files changed, 28 insertions(+), 263 deletions(-)

diff --git a/native/core/src/execution/operators/copy.rs 
b/native/core/src/execution/operators/copy.rs
index 950be36b6..193c385c3 100644
--- a/native/core/src/execution/operators/copy.rs
+++ b/native/core/src/execution/operators/copy.rs
@@ -16,38 +16,11 @@
 // under the License.
 
 use arrow::compute::{cast_with_options, CastOptions};
-use futures::{Stream, StreamExt};
-use std::{
-    any::Any,
-    pin::Pin,
-    sync::Arc,
-    task::{Context, Poll},
-};
+use std::sync::Arc;
 
-use arrow::array::{
-    downcast_dictionary_array, make_array, Array, ArrayRef, MutableArrayData, 
RecordBatch,
-    RecordBatchOptions,
-};
-use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use arrow::array::{downcast_dictionary_array, make_array, Array, ArrayRef, 
MutableArrayData};
+use arrow::datatypes::DataType;
 use arrow::error::ArrowError;
-use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as 
DataFusionResult};
-use datafusion::physical_plan::execution_plan::{Boundedness, 
CardinalityEffect, EmissionType};
-use datafusion::physical_plan::metrics::{BaselineMetrics, 
ExecutionPlanMetricsSet, MetricsSet};
-use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
-
-/// An utility execution node which makes deep copies of input batches.
-///
-/// In certain scenarios like sort, DF execution nodes only make shallow copy 
of input batches.
-/// This could cause issues for Comet, since we re-use column vectors across 
different batches.
-/// For those scenarios, this can be used as an adapter node.
-#[derive(Debug)]
-pub struct CopyExec {
-    input: Arc<dyn ExecutionPlan>,
-    schema: SchemaRef,
-    cache: PlanProperties,
-    metrics: ExecutionPlanMetricsSet,
-    mode: CopyMode,
-}
 
 #[derive(Debug, PartialEq, Clone)]
 pub enum CopyMode {
@@ -57,193 +30,6 @@ pub enum CopyMode {
     UnpackOrClone,
 }
 
-impl CopyExec {
-    pub fn new(input: Arc<dyn ExecutionPlan>, mode: CopyMode) -> Self {
-        // change schema to remove dictionary types because CopyExec always 
unpacks
-        // dictionaries
-
-        let fields: Vec<Field> = input
-            .schema()
-            .fields
-            .iter()
-            .map(|f: &FieldRef| match f.data_type() {
-                DataType::Dictionary(_, value_type) => {
-                    Field::new(f.name(), value_type.as_ref().clone(), 
f.is_nullable())
-                }
-                _ => f.as_ref().clone(),
-            })
-            .collect();
-
-        let schema = Arc::new(Schema::new(fields));
-
-        let cache = PlanProperties::new(
-            EquivalenceProperties::new(Arc::clone(&schema)),
-            Partitioning::UnknownPartitioning(1),
-            EmissionType::Final,
-            Boundedness::Bounded,
-        );
-
-        Self {
-            input,
-            schema,
-            cache,
-            metrics: ExecutionPlanMetricsSet::default(),
-            mode,
-        }
-    }
-
-    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
-        &self.input
-    }
-
-    pub fn mode(&self) -> &CopyMode {
-        &self.mode
-    }
-}
-
-impl DisplayAs for CopyExec {
-    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> 
std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "CopyExec [{:?}]", self.mode)
-            }
-            DisplayFormatType::TreeRender => unimplemented!(),
-        }
-    }
-}
-
-impl ExecutionPlan for CopyExec {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.schema)
-    }
-
-    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
-        vec![&self.input]
-    }
-
-    fn with_new_children(
-        self: Arc<Self>,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
-        let input = Arc::clone(&self.input);
-        let new_input = input.with_new_children(children)?;
-        Ok(Arc::new(CopyExec {
-            input: new_input,
-            schema: Arc::clone(&self.schema),
-            cache: self.cache.clone(),
-            metrics: self.metrics.clone(),
-            mode: self.mode.clone(),
-        }))
-    }
-
-    fn execute(
-        &self,
-        partition: usize,
-        context: Arc<TaskContext>,
-    ) -> DataFusionResult<SendableRecordBatchStream> {
-        let child_stream = self.input.execute(partition, context)?;
-        Ok(Box::pin(CopyStream::new(
-            self,
-            self.schema(),
-            child_stream,
-            partition,
-            self.mode.clone(),
-        )))
-    }
-
-    fn partition_statistics(&self, partition: Option<usize>) -> 
DataFusionResult<Statistics> {
-        self.input.partition_statistics(partition)
-    }
-
-    fn properties(&self) -> &PlanProperties {
-        &self.cache
-    }
-
-    fn name(&self) -> &str {
-        "CopyExec"
-    }
-
-    fn metrics(&self) -> Option<MetricsSet> {
-        Some(self.metrics.clone_inner())
-    }
-
-    fn maintains_input_order(&self) -> Vec<bool> {
-        vec![true; self.children().len()]
-    }
-
-    fn supports_limit_pushdown(&self) -> bool {
-        true
-    }
-
-    fn cardinality_effect(&self) -> CardinalityEffect {
-        CardinalityEffect::Equal
-    }
-}
-
-struct CopyStream {
-    schema: SchemaRef,
-    child_stream: SendableRecordBatchStream,
-    baseline_metrics: BaselineMetrics,
-    mode: CopyMode,
-}
-
-impl CopyStream {
-    fn new(
-        exec: &CopyExec,
-        schema: SchemaRef,
-        child_stream: SendableRecordBatchStream,
-        partition: usize,
-        mode: CopyMode,
-    ) -> Self {
-        Self {
-            schema,
-            child_stream,
-            baseline_metrics: BaselineMetrics::new(&exec.metrics, partition),
-            mode,
-        }
-    }
-
-    // TODO: replace copy_or_cast_array with copy_array if upstream sort 
kernel fixes
-    // dictionary array sorting issue.
-    fn copy(&self, batch: RecordBatch) -> DataFusionResult<RecordBatch> {
-        let mut timer = self.baseline_metrics.elapsed_compute().timer();
-        let vectors = batch
-            .columns()
-            .iter()
-            .map(|v| copy_or_unpack_array(v, &self.mode))
-            .collect::<Result<Vec<ArrayRef>, _>>()?;
-
-        let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
-        let maybe_batch =
-            RecordBatch::try_new_with_options(Arc::clone(&self.schema), 
vectors, &options)
-                .map_err(|e| arrow_datafusion_err!(e));
-        timer.stop();
-        self.baseline_metrics.record_output(batch.num_rows());
-        maybe_batch
-    }
-}
-
-impl Stream for CopyStream {
-    type Item = DataFusionResult<RecordBatch>;
-
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        self.child_stream.poll_next_unpin(cx).map(|x| match x {
-            Some(Ok(batch)) => Some(self.copy(batch)),
-            other => other,
-        })
-    }
-}
-
-impl RecordBatchStream for CopyStream {
-    fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.schema)
-    }
-}
-
 /// Copy an Arrow Array
 pub(crate) fn copy_array(array: &dyn Array) -> ArrayRef {
     let capacity = array.len();
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index ca6e2084e..c4ec83a6a 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -17,12 +17,11 @@
 
 //! Converts Spark physical plan to DataFusion physical plan
 
-use crate::execution::operators::CopyMode;
 use crate::{
     errors::ExpressionError,
     execution::{
         expressions::subquery::Subquery,
-        operators::{CopyExec, ExecutionError, ExpandExec, ScanExec},
+        operators::{ExecutionError, ExpandExec, ScanExec},
         serde::to_arrow_datatype,
         shuffle::ShuffleWriterExec,
     },
@@ -1222,15 +1221,13 @@ impl PhysicalPlanner {
                     .collect();
 
                 let fetch = sort.fetch.map(|num| num as usize);
-                // SortExec caches batches so we need to make a copy of 
incoming batches. Also,
-                // SortExec fails in some cases if we do not unpack 
dictionary-encoded arrays, and
-                // it would be more efficient if we could avoid that.
-                // https://github.com/apache/datafusion-comet/issues/963
-                let child_copied = 
Self::wrap_in_copy_exec(Arc::clone(&child.native_plan));
 
                 let mut sort_exec: Arc<dyn ExecutionPlan> = Arc::new(
-                    SortExec::new(LexOrdering::new(exprs?).unwrap(), 
Arc::clone(&child_copied))
-                        .with_fetch(fetch),
+                    SortExec::new(
+                        LexOrdering::new(exprs?).unwrap(),
+                        Arc::clone(&child.native_plan),
+                    )
+                    .with_fetch(fetch),
                 );
 
                 if let Some(skip) = sort.skip.filter(|&n| n > 0).map(|n| n as 
usize) {
@@ -1394,13 +1391,9 @@ impl PhysicalPlanner {
                 assert_eq!(children.len(), 1);
                 let (scans, child) = self.create_plan(&children[0], inputs, 
partition_count)?;
 
-                // We wrap native shuffle in a CopyExec. This existed 
previously, but for
-                // RangePartitioning at least we want to ensure that 
dictionaries are unpacked.
-                let wrapped_child = 
Self::wrap_in_copy_exec(Arc::clone(&child.native_plan));
-
                 let partitioning = self.create_partitioning(
                     writer.partitioning.as_ref().unwrap(),
-                    wrapped_child.schema(),
+                    child.native_plan.schema(),
                 )?;
 
                 let codec = match writer.codec.try_into() {
@@ -1417,7 +1410,7 @@ impl PhysicalPlanner {
                 }?;
 
                 let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
-                    wrapped_child,
+                    Arc::clone(&child.native_plan),
                     partitioning,
                     codec,
                     writer.output_data_file.clone(),
@@ -1507,8 +1500,8 @@ impl PhysicalPlanner {
                     })
                     .collect();
 
-                let left = 
Self::wrap_in_copy_exec(Arc::clone(&join_params.left.native_plan));
-                let right = 
Self::wrap_in_copy_exec(Arc::clone(&join_params.right.native_plan));
+                let left = Arc::clone(&join_params.left.native_plan);
+                let right = Arc::clone(&join_params.right.native_plan);
 
                 let join = Arc::new(SortMergeJoinExec::try_new(
                     Arc::clone(&left),
@@ -1570,12 +1563,8 @@ impl PhysicalPlanner {
                     partition_count,
                 )?;
 
-                // HashJoinExec may cache the input batch internally. We need
-                // to copy the input batch to avoid the data corruption from 
reusing the input
-                // batch. We also need to unpack dictionary arrays, because 
the join operators
-                // do not support them.
-                let left = 
Self::wrap_in_copy_exec(Arc::clone(&join_params.left.native_plan));
-                let right = 
Self::wrap_in_copy_exec(Arc::clone(&join_params.right.native_plan));
+                let left = Arc::clone(&join_params.left.native_plan);
+                let right = Arc::clone(&join_params.right.native_plan);
 
                 let hash_join = Arc::new(HashJoinExec::try_new(
                     left,
@@ -1805,11 +1794,6 @@ impl PhysicalPlanner {
         ))
     }
 
-    /// Wrap an ExecutionPlan in a CopyExec, which will unpack any 
dictionary-encoded arrays.
-    fn wrap_in_copy_exec(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
-        Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone))
-    }
-
     /// Create a DataFusion physical aggregate expression from Spark physical 
aggregate expression
     fn create_agg_expr(
         &self,
diff --git a/native/core/src/execution/spark_plan.rs 
b/native/core/src/execution/spark_plan.rs
index 333429262..2487c76a1 100644
--- a/native/core/src/execution/spark_plan.rs
+++ b/native/core/src/execution/spark_plan.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::execution::operators::CopyExec;
 use arrow::datatypes::SchemaRef;
 use datafusion::physical_plan::ExecutionPlan;
 use std::sync::Arc;
@@ -43,15 +42,11 @@ impl SparkPlan {
         native_plan: Arc<dyn ExecutionPlan>,
         children: Vec<Arc<SparkPlan>>,
     ) -> Self {
-        let mut additional_native_plans: Vec<Arc<dyn ExecutionPlan>> = vec![];
-        for child in &children {
-            collect_additional_plans(Arc::clone(&child.native_plan), &mut 
additional_native_plans);
-        }
         Self {
             plan_id,
             native_plan,
             children,
-            additional_native_plans,
+            additional_native_plans: vec![],
         }
     }
 
@@ -66,9 +61,6 @@ impl SparkPlan {
         for plan in &additional_native_plans {
             accum.push(Arc::clone(plan));
         }
-        for child in &children {
-            collect_additional_plans(Arc::clone(&child.native_plan), &mut 
accum);
-        }
         Self {
             plan_id,
             native_plan,
@@ -87,12 +79,3 @@ impl SparkPlan {
         &self.children
     }
 }
-
-fn collect_additional_plans(
-    child: Arc<dyn ExecutionPlan>,
-    additional_native_plans: &mut Vec<Arc<dyn ExecutionPlan>>,
-) {
-    if child.as_any().is::<CopyExec>() {
-        additional_native_plans.push(Arc::clone(&child));
-    }
-}
diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
index 006112d2b..8043b81b3 100644
--- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala
@@ -200,6 +200,10 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
   }
 
   test("join") {
+    // TODO enable native_datafusion tests
+    // https://github.com/apache/datafusion-comet/issues/2660
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
+
     val df = spark.read.parquet(filename)
     df.createOrReplaceTempView("t1")
     df.createOrReplaceTempView("t2")
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index d47b4e0c1..010757d3a 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -74,6 +74,10 @@ class CometJoinSuite extends CometTestBase {
   }
 
   test("Broadcast HashJoin without join filter") {
+    // TODO enable native_datafusion tests
+    // https://github.com/apache/datafusion-comet/issues/2660
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
+
     withSQLConf(
       CometConf.COMET_BATCH_SIZE.key -> "100",
       SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
@@ -101,6 +105,10 @@ class CometJoinSuite extends CometTestBase {
   }
 
   test("Broadcast HashJoin with join filter") {
+    // TODO enable native_datafusion tests
+    // https://github.com/apache/datafusion-comet/issues/2660
+    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
+
     withSQLConf(
       CometConf.COMET_BATCH_SIZE.key -> "100",
       SQLConf.PREFER_SORTMERGEJOIN.key -> "false",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to