This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b0e6db77 chore: Simplify approach to avoiding memory corruption due 
to buffer reuse (#2156)
2b0e6db77 is described below

commit 2b0e6db77b2c27bfa91eb34d0a2469d1014c20e6
Author: Andy Grove <agr...@apache.org>
AuthorDate: Fri Aug 15 08:28:27 2025 -0600

    chore: Simplify approach to avoiding memory corruption due to buffer reuse 
(#2156)
---
 native/core/Cargo.toml                             |   4 -
 native/core/benches/filter.rs                      | 112 ----
 native/core/src/execution/operators/copy.rs        |   2 +-
 native/core/src/execution/operators/filter.rs      | 574 ---------------------
 native/core/src/execution/operators/mod.rs         |   3 -
 native/core/src/execution/operators/scan.rs        |  11 +-
 native/core/src/execution/planner.rs               |  80 +--
 native/proto/src/proto/operator.proto              |   1 -
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  17 -
 9 files changed, 27 insertions(+), 777 deletions(-)

diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 92b7da8e5..5bd62a890 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -119,7 +119,3 @@ harness = false
 [[bench]]
 name = "parquet_decode"
 harness = false
-
-[[bench]]
-name = "filter"
-harness = false
diff --git a/native/core/benches/filter.rs b/native/core/benches/filter.rs
deleted file mode 100644
index 82fa4aac6..000000000
--- a/native/core/benches/filter.rs
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.use arrow::array::{ArrayRef, BooleanBuilder, 
Int32Builder, RecordBatch, StringBuilder};
-
-use arrow::array::builder::{BooleanBuilder, Int32Builder, StringBuilder};
-use arrow::array::{ArrayRef, RecordBatch};
-use arrow::compute::filter_record_batch;
-use arrow::datatypes::{DataType, Field, Schema};
-use comet::execution::operators::comet_filter_record_batch;
-use criterion::{criterion_group, criterion_main, Criterion};
-use std::hint::black_box;
-use std::sync::Arc;
-use std::time::Duration;
-
-fn criterion_benchmark(c: &mut Criterion) {
-    let mut group = c.benchmark_group("filter");
-
-    let num_rows = 8192;
-    let num_int_cols = 4;
-    let num_string_cols = 4;
-
-    let batch = create_record_batch(num_rows, num_int_cols, num_string_cols);
-
-    // create some different predicates
-    let mut predicate_select_few = BooleanBuilder::with_capacity(num_rows);
-    let mut predicate_select_many = BooleanBuilder::with_capacity(num_rows);
-    let mut predicate_select_all = BooleanBuilder::with_capacity(num_rows);
-    for i in 0..num_rows {
-        predicate_select_few.append_value(i % 10 == 0);
-        predicate_select_many.append_value(i % 10 > 0);
-        predicate_select_all.append_value(true);
-    }
-    let predicate_select_few = predicate_select_few.finish();
-    let predicate_select_many = predicate_select_many.finish();
-    let predicate_select_all = predicate_select_all.finish();
-
-    // baseline uses Arrow's filter_record_batch method
-    group.bench_function("arrow_filter_record_batch - few rows selected", |b| {
-        b.iter(|| filter_record_batch(black_box(&batch), 
black_box(&predicate_select_few)))
-    });
-    group.bench_function("arrow_filter_record_batch - many rows selected", |b| 
{
-        b.iter(|| filter_record_batch(black_box(&batch), 
black_box(&predicate_select_many)))
-    });
-    group.bench_function("arrow_filter_record_batch - all rows selected", |b| {
-        b.iter(|| filter_record_batch(black_box(&batch), 
black_box(&predicate_select_all)))
-    });
-
-    group.bench_function("comet_filter_record_batch - few rows selected", |b| {
-        b.iter(|| comet_filter_record_batch(black_box(&batch), 
black_box(&predicate_select_few)))
-    });
-    group.bench_function("comet_filter_record_batch - many rows selected", |b| 
{
-        b.iter(|| comet_filter_record_batch(black_box(&batch), 
black_box(&predicate_select_many)))
-    });
-    group.bench_function("comet_filter_record_batch - all rows selected", |b| {
-        b.iter(|| comet_filter_record_batch(black_box(&batch), 
black_box(&predicate_select_all)))
-    });
-
-    group.finish();
-}
-
-fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: 
i32) -> RecordBatch {
-    let mut int32_builder = Int32Builder::with_capacity(num_rows);
-    let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 
32);
-    for i in 0..num_rows {
-        int32_builder.append_value(i as i32);
-        string_builder.append_value(format!("this is string #{i}"));
-    }
-    let int32_array = Arc::new(int32_builder.finish());
-    let string_array = Arc::new(string_builder.finish());
-
-    let mut fields = vec![];
-    let mut columns: Vec<ArrayRef> = vec![];
-    let mut i = 0;
-    for _ in 0..num_int_cols {
-        fields.push(Field::new(format!("c{i}"), DataType::Int32, false));
-        columns.push(int32_array.clone()); // note this is just copying a 
reference to the array
-        i += 1;
-    }
-    for _ in 0..num_string_cols {
-        fields.push(Field::new(format!("c{i}"), DataType::Utf8, false));
-        columns.push(string_array.clone()); // note this is just copying a 
reference to the array
-        i += 1;
-    }
-    let schema = Schema::new(fields);
-    RecordBatch::try_new(Arc::new(schema), columns).unwrap()
-}
-
-fn config() -> Criterion {
-    Criterion::default()
-        .measurement_time(Duration::from_millis(500))
-        .warm_up_time(Duration::from_millis(500))
-}
-
-criterion_group! {
-    name = benches;
-    config = config();
-    targets = criterion_benchmark
-}
-criterion_main!(benches);
diff --git a/native/core/src/execution/operators/copy.rs 
b/native/core/src/execution/operators/copy.rs
index f1e87c2e0..950be36b6 100644
--- a/native/core/src/execution/operators/copy.rs
+++ b/native/core/src/execution/operators/copy.rs
@@ -245,7 +245,7 @@ impl RecordBatchStream for CopyStream {
 }
 
 /// Copy an Arrow Array
-fn copy_array(array: &dyn Array) -> ArrayRef {
+pub(crate) fn copy_array(array: &dyn Array) -> ArrayRef {
     let capacity = array.len();
     let data = array.to_data();
 
diff --git a/native/core/src/execution/operators/filter.rs 
b/native/core/src/execution/operators/filter.rs
deleted file mode 100644
index 272bbd4d8..000000000
--- a/native/core/src/execution/operators/filter.rs
+++ /dev/null
@@ -1,574 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::any::Any;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::{ready, Context, Poll};
-
-use datafusion::physical_plan::{
-    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
-    ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, 
ExecutionPlanProperties,
-    PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
-};
-
-use arrow::array::{
-    make_array, Array, ArrayRef, BooleanArray, MutableArrayData, 
RecordBatchOptions,
-};
-use arrow::compute::filter_record_batch;
-use arrow::datatypes::{DataType, SchemaRef};
-use arrow::error::ArrowError;
-use arrow::record_batch::RecordBatch;
-use datafusion::common::cast::as_boolean_array;
-use datafusion::common::stats::Precision;
-use datafusion::common::{
-    internal_err, plan_err, project_schema, DataFusionError, Result, 
ScalarValue,
-};
-use datafusion::execution::TaskContext;
-use datafusion::logical_expr::Operator;
-use datafusion::physical_expr::equivalence::ProjectionMapping;
-use datafusion::physical_expr::expressions::BinaryExpr;
-use datafusion::physical_expr::intervals::utils::check_support;
-use datafusion::physical_expr::utils::collect_columns;
-use datafusion::physical_expr::{
-    analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, 
ExprBoundaries,
-    PhysicalExpr,
-};
-use datafusion::physical_plan::common::can_project;
-use datafusion::physical_plan::execution_plan::CardinalityEffect;
-use futures::stream::{Stream, StreamExt};
-use log::trace;
-
-/// This is a copy of DataFusion's FilterExec with one modification to ensure 
that input
-/// batches are never passed through unchanged. The changes are between the 
comments
-/// `BEGIN Comet change` and `END Comet change`.
-#[derive(Debug, Clone)]
-pub struct FilterExec {
-    /// The expression to filter on. This expression must evaluate to a 
boolean value.
-    predicate: Arc<dyn PhysicalExpr>,
-    /// The input plan
-    input: Arc<dyn ExecutionPlan>,
-    /// Execution metrics
-    metrics: ExecutionPlanMetricsSet,
-    /// Selectivity for statistics. 0 = no rows, 100 = all rows
-    default_selectivity: u8,
-    /// Properties equivalence properties, partitioning, etc.
-    cache: PlanProperties,
-    /// The projection indices of the columns in the output schema of join
-    projection: Option<Vec<usize>>,
-}
-
-impl FilterExec {
-    /// Create a FilterExec on an input
-    pub fn try_new(
-        predicate: Arc<dyn PhysicalExpr>,
-        input: Arc<dyn ExecutionPlan>,
-    ) -> Result<Self> {
-        match predicate.data_type(input.schema().as_ref())? {
-            DataType::Boolean => {
-                let default_selectivity = 20;
-                let cache =
-                    Self::compute_properties(&input, &predicate, 
default_selectivity, None)?;
-                Ok(Self {
-                    predicate,
-                    input: Arc::clone(&input),
-                    metrics: ExecutionPlanMetricsSet::new(),
-                    default_selectivity,
-                    cache,
-                    projection: None,
-                })
-            }
-            other => {
-                plan_err!("Filter predicate must return BOOLEAN values, got 
{other:?}")
-            }
-        }
-    }
-
-    pub fn with_default_selectivity(
-        mut self,
-        default_selectivity: u8,
-    ) -> Result<Self, DataFusionError> {
-        if default_selectivity > 100 {
-            return plan_err!(
-                "Default filter selectivity value needs to be less than or 
equal to 100"
-            );
-        }
-        self.default_selectivity = default_selectivity;
-        Ok(self)
-    }
-
-    /// Return new instance of [FilterExec] with the given projection.
-    pub fn with_projection(&self, projection: Option<Vec<usize>>) -> 
Result<Self> {
-        //  Check if the projection is valid
-        can_project(&self.schema(), projection.as_ref())?;
-
-        let projection = match projection {
-            Some(projection) => match &self.projection {
-                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
-                None => Some(projection),
-            },
-            None => None,
-        };
-
-        let cache = Self::compute_properties(
-            &self.input,
-            &self.predicate,
-            self.default_selectivity,
-            projection.as_ref(),
-        )?;
-        Ok(Self {
-            predicate: Arc::clone(&self.predicate),
-            input: Arc::clone(&self.input),
-            metrics: self.metrics.clone(),
-            default_selectivity: self.default_selectivity,
-            cache,
-            projection,
-        })
-    }
-
-    /// The expression to filter on. This expression must evaluate to a 
boolean value.
-    pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
-        &self.predicate
-    }
-
-    /// The input plan
-    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
-        &self.input
-    }
-
-    /// The default selectivity
-    pub fn default_selectivity(&self) -> u8 {
-        self.default_selectivity
-    }
-
-    /// Projection
-    pub fn projection(&self) -> Option<&Vec<usize>> {
-        self.projection.as_ref()
-    }
-
-    /// Calculates `Statistics` for `FilterExec`, by applying selectivity 
(either default, or estimated) to input statistics.
-    fn statistics_helper(
-        input: &Arc<dyn ExecutionPlan>,
-        predicate: &Arc<dyn PhysicalExpr>,
-        default_selectivity: u8,
-    ) -> Result<Statistics> {
-        let input_stats = input.partition_statistics(None)?;
-        let schema = input.schema();
-        if !check_support(predicate, &schema) {
-            let selectivity = default_selectivity as f64 / 100.0;
-            let mut stats = input_stats.to_inexact();
-            stats.num_rows = 
stats.num_rows.with_estimated_selectivity(selectivity);
-            stats.total_byte_size = stats
-                .total_byte_size
-                .with_estimated_selectivity(selectivity);
-            return Ok(stats);
-        }
-
-        let num_rows = input_stats.num_rows;
-        let total_byte_size = input_stats.total_byte_size;
-        let input_analysis_ctx =
-            AnalysisContext::try_from_statistics(&input.schema(), 
&input_stats.column_statistics)?;
-
-        let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?;
-
-        // Estimate (inexact) selectivity of predicate
-        let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
-        let num_rows = num_rows.with_estimated_selectivity(selectivity);
-        let total_byte_size = 
total_byte_size.with_estimated_selectivity(selectivity);
-
-        let column_statistics =
-            collect_new_statistics(&input_stats.column_statistics, 
analysis_ctx.boundaries);
-        Ok(Statistics {
-            num_rows,
-            total_byte_size,
-            column_statistics,
-        })
-    }
-
-    fn extend_constants(
-        input: &Arc<dyn ExecutionPlan>,
-        predicate: &Arc<dyn PhysicalExpr>,
-    ) -> Vec<ConstExpr> {
-        let mut res_constants = Vec::new();
-        let input_eqs = input.equivalence_properties();
-
-        let conjunctions = split_conjunction(predicate);
-        for conjunction in conjunctions {
-            if let Some(binary) = 
conjunction.as_any().downcast_ref::<BinaryExpr>() {
-                if binary.op() == &Operator::Eq {
-                    // Filter evaluates to single value for all partitions
-                    if input_eqs.is_expr_constant(binary.left()).is_some() {
-                        let across = input_eqs
-                            .is_expr_constant(binary.right())
-                            .unwrap_or_default();
-                        
res_constants.push(ConstExpr::new(Arc::clone(binary.right()), across));
-                    } else if 
input_eqs.is_expr_constant(binary.right()).is_some() {
-                        let across = input_eqs
-                            .is_expr_constant(binary.left())
-                            .unwrap_or_default();
-                        
res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
-                    }
-                }
-            }
-        }
-        res_constants
-    }
-    /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
-    fn compute_properties(
-        input: &Arc<dyn ExecutionPlan>,
-        predicate: &Arc<dyn PhysicalExpr>,
-        default_selectivity: u8,
-        projection: Option<&Vec<usize>>,
-    ) -> Result<PlanProperties> {
-        // Combine the equal predicates with the input equivalence properties
-        // to construct the equivalence properties:
-        let stats = Self::statistics_helper(input, predicate, 
default_selectivity)?;
-        let mut eq_properties = input.equivalence_properties().clone();
-        let (equal_pairs, _) = collect_columns_from_predicate(predicate);
-        for (lhs, rhs) in equal_pairs {
-            eq_properties.add_equal_conditions(Arc::clone(lhs), 
Arc::clone(rhs))?
-        }
-        // Add the columns that have only one viable value (singleton) after
-        // filtering to constants.
-        let constants = collect_columns(predicate)
-            .into_iter()
-            .filter(|column| 
stats.column_statistics[column.index()].is_singleton())
-            .map(|column| {
-                let value = stats.column_statistics[column.index()]
-                    .min_value
-                    .get_value();
-                let expr = Arc::new(column) as _;
-                ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
-            });
-        // This is for statistics
-        eq_properties.add_constants(constants)?;
-        // This is for logical constant (for example: a = '1', then a could be 
marked as a constant)
-        // to do: how to deal with a multiple situation to represent = (for 
example, c1 between 0 and 0)
-        eq_properties.add_constants(Self::extend_constants(input, predicate))?;
-
-        let mut output_partitioning = input.output_partitioning().clone();
-        // If contains projection, update the PlanProperties.
-        if let Some(projection) = projection {
-            let schema = eq_properties.schema();
-            let projection_mapping = 
ProjectionMapping::from_indices(projection, schema)?;
-            let out_schema = project_schema(schema, Some(projection))?;
-            output_partitioning = 
output_partitioning.project(&projection_mapping, &eq_properties);
-            eq_properties = eq_properties.project(&projection_mapping, 
out_schema);
-        }
-
-        Ok(PlanProperties::new(
-            eq_properties,
-            output_partitioning,
-            input.pipeline_behavior(),
-            input.boundedness(),
-        ))
-    }
-}
-
-impl DisplayAs for FilterExec {
-    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> 
std::fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let display_projections = if let Some(projection) = 
self.projection.as_ref() {
-                    format!(
-                        ", projection=[{}]",
-                        projection
-                            .iter()
-                            .map(|index| format!(
-                                "{}@{}",
-                                
self.input.schema().fields().get(*index).unwrap().name(),
-                                index
-                            ))
-                            .collect::<Vec<_>>()
-                            .join(", ")
-                    )
-                } else {
-                    "".to_string()
-                };
-                write!(
-                    f,
-                    "CometFilterExec: {}{}",
-                    self.predicate, display_projections
-                )
-            }
-            DisplayFormatType::TreeRender => unimplemented!(),
-        }
-    }
-}
-
-impl ExecutionPlan for FilterExec {
-    fn name(&self) -> &'static str {
-        "CometFilterExec"
-    }
-
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn properties(&self) -> &PlanProperties {
-        &self.cache
-    }
-
-    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
-        vec![&self.input]
-    }
-
-    fn maintains_input_order(&self) -> Vec<bool> {
-        // Tell optimizer this operator doesn't reorder its input
-        vec![true]
-    }
-
-    fn with_new_children(
-        self: Arc<Self>,
-        mut children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        FilterExec::try_new(Arc::clone(&self.predicate), 
children.swap_remove(0))
-            .and_then(|e| {
-                let selectivity = e.default_selectivity();
-                e.with_default_selectivity(selectivity)
-            })
-            .and_then(|e| e.with_projection(self.projection().cloned()))
-            .map(|e| Arc::new(e) as _)
-    }
-
-    fn execute(
-        &self,
-        partition: usize,
-        context: Arc<TaskContext>,
-    ) -> Result<SendableRecordBatchStream> {
-        trace!(
-            "Start FilterExec::execute for partition {} of context session_id 
{} and task_id {:?}",
-            partition,
-            context.session_id(),
-            context.task_id()
-        );
-        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-        Ok(Box::pin(FilterExecStream {
-            schema: self.schema(),
-            predicate: Arc::clone(&self.predicate),
-            input: self.input.execute(partition, context)?,
-            baseline_metrics,
-            projection: self.projection.clone(),
-        }))
-    }
-
-    fn metrics(&self) -> Option<MetricsSet> {
-        Some(self.metrics.clone_inner())
-    }
-
-    /// The output statistics of a filtering operation can be estimated if the
-    /// predicate's selectivity value can be determined for the incoming data.
-    fn statistics(&self) -> Result<Statistics> {
-        let stats =
-            Self::statistics_helper(&self.input, self.predicate(), 
self.default_selectivity)?;
-        Ok(stats.project(self.projection.as_ref()))
-    }
-
-    fn cardinality_effect(&self) -> CardinalityEffect {
-        CardinalityEffect::LowerEqual
-    }
-}
-
-/// This function ensures that all bounds in the `ExprBoundaries` vector are
-/// converted to closed bounds. If a lower/upper bound is initially open, it
-/// is adjusted by using the next/previous value for its data type to convert
-/// it into a closed bound.
-fn collect_new_statistics(
-    input_column_stats: &[ColumnStatistics],
-    analysis_boundaries: Vec<ExprBoundaries>,
-) -> Vec<ColumnStatistics> {
-    analysis_boundaries
-        .into_iter()
-        .enumerate()
-        .map(
-            |(
-                idx,
-                ExprBoundaries {
-                    interval,
-                    distinct_count,
-                    ..
-                },
-            )| {
-                let Some(interval) = interval else {
-                    // If the interval is `None`, we can say that there are no 
rows:
-                    return ColumnStatistics {
-                        null_count: Precision::Exact(0),
-                        max_value: Precision::Exact(ScalarValue::Null),
-                        min_value: Precision::Exact(ScalarValue::Null),
-                        sum_value: Precision::Exact(ScalarValue::Null),
-                        distinct_count: Precision::Exact(0),
-                    };
-                };
-                let (lower, upper) = interval.into_bounds();
-                let (min_value, max_value) = if lower.eq(&upper) {
-                    (Precision::Exact(lower), Precision::Exact(upper))
-                } else {
-                    (Precision::Inexact(lower), Precision::Inexact(upper))
-                };
-                ColumnStatistics {
-                    null_count: 
input_column_stats[idx].null_count.to_inexact(),
-                    max_value,
-                    min_value,
-                    sum_value: Precision::Absent,
-                    distinct_count: distinct_count.to_inexact(),
-                }
-            },
-        )
-        .collect()
-}
-
-/// The FilterExec streams wraps the input iterator and applies the predicate 
expression to
-/// determine which rows to include in its output batches
-struct FilterExecStream {
-    /// Output schema after the projection
-    schema: SchemaRef,
-    /// The expression to filter on. This expression must evaluate to a 
boolean value.
-    predicate: Arc<dyn PhysicalExpr>,
-    /// The input partition to filter.
-    input: SendableRecordBatchStream,
-    /// Runtime metrics recording
-    baseline_metrics: BaselineMetrics,
-    /// The projection indices of the columns in the input schema
-    projection: Option<Vec<usize>>,
-}
-
-fn filter_and_project(
-    batch: &RecordBatch,
-    predicate: &Arc<dyn PhysicalExpr>,
-    projection: Option<&Vec<usize>>,
-    output_schema: &SchemaRef,
-) -> Result<RecordBatch> {
-    predicate
-        .evaluate(batch)
-        .and_then(|v| v.into_array(batch.num_rows()))
-        .and_then(|array| {
-            Ok(match (as_boolean_array(&array), projection) {
-                // Apply filter array to record batch
-                (Ok(filter_array), None) => comet_filter_record_batch(batch, 
filter_array)?,
-                (Ok(filter_array), Some(projection)) => {
-                    let projected_columns = projection
-                        .iter()
-                        .map(|i| Arc::clone(batch.column(*i)))
-                        .collect();
-                    let projected_batch =
-                        RecordBatch::try_new(Arc::clone(output_schema), 
projected_columns)?;
-                    comet_filter_record_batch(&projected_batch, filter_array)?
-                }
-                (Err(_), _) => {
-                    return internal_err!("Cannot create filter_array from 
non-boolean predicates");
-                }
-            })
-        })
-}
-
-// BEGIN Comet changes
-// `FilterExec` could modify input batch or return input batch without change. 
Instead of always
-// adding `CopyExec` on top of it, we only copy input batch for the special 
case.
-pub fn comet_filter_record_batch(
-    record_batch: &RecordBatch,
-    predicate: &BooleanArray,
-) -> std::result::Result<RecordBatch, ArrowError> {
-    if predicate.true_count() == record_batch.num_rows() {
-        // special case where we just make an exact copy
-        let arrays: Vec<ArrayRef> = record_batch
-            .columns()
-            .iter()
-            .map(|array| {
-                let capacity = array.len();
-                let data = array.to_data();
-                let mut mutable = MutableArrayData::new(vec![&data], false, 
capacity);
-                mutable.extend(0, 0, capacity);
-                make_array(mutable.freeze())
-            })
-            .collect();
-        let options = 
RecordBatchOptions::new().with_row_count(Some(record_batch.num_rows()));
-        RecordBatch::try_new_with_options(Arc::clone(&record_batch.schema()), 
arrays, &options)
-    } else {
-        filter_record_batch(record_batch, predicate)
-    }
-}
-// END Comet changes
-
-impl Stream for FilterExecStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        let poll;
-        loop {
-            match ready!(self.input.poll_next_unpin(cx)) {
-                Some(Ok(batch)) => {
-                    let timer = 
self.baseline_metrics.elapsed_compute().timer();
-                    let filtered_batch = filter_and_project(
-                        &batch,
-                        &self.predicate,
-                        self.projection.as_ref(),
-                        &self.schema,
-                    )?;
-                    timer.done();
-                    // Skip entirely filtered batches
-                    if filtered_batch.num_rows() == 0 {
-                        continue;
-                    }
-                    poll = Poll::Ready(Some(Ok(filtered_batch)));
-                    break;
-                }
-                value => {
-                    poll = Poll::Ready(value);
-                    break;
-                }
-            }
-        }
-        self.baseline_metrics.record_poll(poll)
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        // Same number of record batches
-        self.input.size_hint()
-    }
-}
-
-impl RecordBatchStream for FilterExecStream {
-    fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.schema)
-    }
-}
-
-/// Return the equals Column-Pairs and Non-equals Column-Pairs
-fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> 
EqualAndNonEqual<'_> {
-    let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
-    let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
-
-    let predicates = split_conjunction(predicate);
-    predicates.into_iter().for_each(|p| {
-        if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
-            match binary.op() {
-                Operator::Eq => eq_predicate_columns.push((binary.left(), 
binary.right())),
-                Operator::NotEq => ne_predicate_columns.push((binary.left(), 
binary.right())),
-                _ => {}
-            }
-        }
-    });
-
-    (eq_predicate_columns, ne_predicate_columns)
-}
-
-/// Pair of `Arc<dyn PhysicalExpr>`s
-pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn 
PhysicalExpr>);
-
-/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
-pub type EqualAndNonEqual<'a> = (Vec<PhysicalExprPairRef<'a>>, 
Vec<PhysicalExprPairRef<'a>>);
diff --git a/native/core/src/execution/operators/mod.rs 
b/native/core/src/execution/operators/mod.rs
index 4e15e4341..c8cfebd45 100644
--- a/native/core/src/execution/operators/mod.rs
+++ b/native/core/src/execution/operators/mod.rs
@@ -22,14 +22,11 @@ use std::fmt::Debug;
 use jni::objects::GlobalRef;
 
 pub use copy::*;
-pub use filter::comet_filter_record_batch;
-pub use filter::FilterExec;
 pub use scan::*;
 
 mod copy;
 mod expand;
 pub use expand::ExpandExec;
-mod filter;
 mod scan;
 
 /// Error returned during executing operators.
diff --git a/native/core/src/execution/operators/scan.rs 
b/native/core/src/execution/operators/scan.rs
index a842efaa3..6d252743b 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::execution::operators::copy_array;
 use crate::{
     errors::CometError,
     execution::{
@@ -276,8 +277,16 @@ impl ScanExec {
             let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;
 
             // TODO: validate array input data
+            // array_data.validate_full()?;
 
-            inputs.push(make_array(array_data));
+            let array = make_array(array_data);
+
+            // we copy the array to that we don't have to worry about 
potential memory
+            // corruption issues later on if underlying buffers are reused or 
freed
+            // TODO optimize this so that we only do this for Parquet inputs!
+            let array = copy_array(&array);
+
+            inputs.push(array);
 
             // Drop the Arcs to avoid memory leak
             unsafe {
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 0b0b7668f..71bb0c748 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -18,7 +18,6 @@
 //! Converts Spark physical plan to DataFusion physical plan
 
 use crate::execution::operators::CopyMode;
-use crate::execution::operators::FilterExec as CometFilterExec;
 use crate::{
     errors::ExpressionError,
     execution::{
@@ -93,7 +92,7 @@ use arrow::array::{
 use arrow::buffer::BooleanBuffer;
 use datafusion::common::utils::SingleRowListArrayBuilder;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
-use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec;
+use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::limit::GlobalLimitExec;
 use datafusion_comet_proto::spark_operator::SparkFilePartition;
 use datafusion_comet_proto::{
@@ -1178,25 +1177,17 @@ impl PhysicalPlanner {
                 let predicate =
                     self.create_expr(filter.predicate.as_ref().unwrap(), 
child.schema())?;
 
-                let filter: Arc<dyn ExecutionPlan> =
-                    match (filter.wrap_child_in_copy_exec, 
filter.use_datafusion_filter) {
-                        (true, true) => Arc::new(DataFusionFilterExec::try_new(
-                            predicate,
-                            
Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)),
-                        )?),
-                        (true, false) => Arc::new(CometFilterExec::try_new(
-                            predicate,
-                            
Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)),
-                        )?),
-                        (false, true) => 
Arc::new(DataFusionFilterExec::try_new(
-                            predicate,
-                            Arc::clone(&child.native_plan),
-                        )?),
-                        (false, false) => Arc::new(CometFilterExec::try_new(
-                            predicate,
-                            Arc::clone(&child.native_plan),
-                        )?),
-                    };
+                let filter: Arc<dyn ExecutionPlan> = if 
filter.wrap_child_in_copy_exec {
+                    Arc::new(FilterExec::try_new(
+                        predicate,
+                        
Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)),
+                    )?)
+                } else {
+                    Arc::new(FilterExec::try_new(
+                        predicate,
+                        Arc::clone(&child.native_plan),
+                    )?)
+                };
 
                 Ok((
                     scans,
@@ -1564,14 +1555,7 @@ impl PhysicalPlanner {
                 // the data corruption. Note that we only need to copy the 
input batch
                 // if the child operator is `ScanExec`, because other 
operators after `ScanExec`
                 // will create new arrays for the output batch.
-                let input = if can_reuse_input_batch(&child.native_plan) {
-                    Arc::new(CopyExec::new(
-                        Arc::clone(&child.native_plan),
-                        CopyMode::UnpackOrDeepCopy,
-                    ))
-                } else {
-                    Arc::clone(&child.native_plan)
-                };
+                let input = Arc::clone(&child.native_plan);
                 let expand = Arc::new(ExpandExec::new(projections, input, 
schema));
                 Ok((
                     scans,
@@ -1901,14 +1885,9 @@ impl PhysicalPlanner {
         ))
     }
 
-    /// Wrap an ExecutionPlan in a CopyExec, which will unpack any 
dictionary-encoded arrays
-    /// and make a deep copy of other arrays if the plan re-uses batches.
+    /// Wrap an ExecutionPlan in a CopyExec, which will unpack any 
dictionary-encoded arrays.
     fn wrap_in_copy_exec(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
-        if can_reuse_input_batch(&plan) {
-            Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy))
-        } else {
-            Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone))
-        }
+        Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone))
     }
 
     /// Create a DataFusion physical aggregate expression from Spark physical 
aggregate expression
@@ -2609,32 +2588,6 @@ impl From<ExpressionError> for DataFusionError {
     }
 }
 
-/// Returns true if given operator can return input array as output array 
without
-/// modification. This is used to determine if we need to copy the input batch 
to avoid
-/// data corruption from reusing the input batch.
-fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
-    if op.as_any().is::<ScanExec>() {
-        // native_comet and native_iceberg_compat scan reuse mutable buffers
-        // so we need to make copies of the batches
-        // for now, we also copy even if the source is not a Parquet scan, but
-        // we will optimize this later
-        true
-    } else if op.as_any().is::<CopyExec>() {
-        let copy_exec = op.as_any().downcast_ref::<CopyExec>().unwrap();
-        copy_exec.mode() == &CopyMode::UnpackOrClone && 
can_reuse_input_batch(copy_exec.input())
-    } else if op.as_any().is::<CometFilterExec>() {
-        // CometFilterExec guarantees that all arrays have been copied
-        false
-    } else {
-        for child in op.children() {
-            if can_reuse_input_batch(child) {
-                return true;
-            }
-        }
-        false
-    }
-}
-
 /// Collects the indices of the columns in the input schema that are used in 
the expression
 /// and returns them as a pair of vectors, one for the left side and one for 
the right side.
 fn expr_to_columns(
@@ -3109,7 +3062,6 @@ mod tests {
             children: vec![child_op],
             op_struct: Some(OpStruct::Filter(spark_operator::Filter {
                 predicate: Some(expr),
-                use_datafusion_filter: false,
                 wrap_child_in_copy_exec: false,
             })),
         }
@@ -3123,7 +3075,7 @@ mod tests {
 
         let (_scans, filter_exec) = planner.create_plan(&op, &mut vec![], 
1).unwrap();
 
-        assert_eq!("CometFilterExec", filter_exec.native_plan.name());
+        assert_eq!("FilterExec", filter_exec.native_plan.name());
         assert_eq!(1, filter_exec.children.len());
         assert_eq!(0, filter_exec.additional_native_plans.len());
     }
diff --git a/native/proto/src/proto/operator.proto 
b/native/proto/src/proto/operator.proto
index ac34def81..5cb332ef0 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -109,7 +109,6 @@ message Projection {
 
 message Filter {
   spark.spark_expression.Expr predicate = 1;
-  bool use_datafusion_filter = 2;
   // Some expressions don't support dictionary arrays, so may need to wrap the 
child in a CopyExec
   bool wrap_child_in_copy_exec = 3;
 }
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index f0b4059b9..c646ceb69 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1788,22 +1788,6 @@ object QueryPlanSerde extends Logging with CometExprShim 
{
         val cond = exprToProto(condition, child.output)
 
         if (cond.isDefined && childOp.nonEmpty) {
-          // We need to determine whether to use DataFusion's FilterExec or 
Comet's
-          // FilterExec. The difference is that DataFusion's implementation 
will sometimes pass
-          // batches through whereas the Comet implementation guarantees that 
a copy is always
-          // made, which is critical when using `native_comet` scans due to 
buffer re-use
-
-          // TODO this could be optimized more to stop walking the tree on 
hitting
-          //  certain operators such as join or aggregate which will copy 
batches
-          def containsNativeCometScan(plan: SparkPlan): Boolean = {
-            plan match {
-              case w: CometScanWrapper => 
containsNativeCometScan(w.originalPlan)
-              case scan: CometScanExec => scan.scanImpl == 
CometConf.SCAN_NATIVE_COMET
-              case _: CometNativeScanExec => false
-              case _ => plan.children.exists(containsNativeCometScan)
-            }
-          }
-
           // Some native expressions do not support operating on 
dictionary-encoded arrays, so
           // wrap the child in a CopyExec to unpack dictionaries first.
           def wrapChildInCopyExec(condition: Expression): Boolean = {
@@ -1816,7 +1800,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
           val filterBuilder = OperatorOuterClass.Filter
             .newBuilder()
             .setPredicate(cond.get)
-            .setUseDatafusionFilter(!containsNativeCometScan(op))
             .setWrapChildInCopyExec(wrapChildInCopyExec(condition))
           Some(builder.setFilter(filterBuilder).build())
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org


Reply via email to