This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 2b0e6db77 chore: Simplify approach to avoiding memory corruption due to buffer reuse (#2156) 2b0e6db77 is described below commit 2b0e6db77b2c27bfa91eb34d0a2469d1014c20e6 Author: Andy Grove <agr...@apache.org> AuthorDate: Fri Aug 15 08:28:27 2025 -0600 chore: Simplify approach to avoiding memory corruption due to buffer reuse (#2156) --- native/core/Cargo.toml | 4 - native/core/benches/filter.rs | 112 ---- native/core/src/execution/operators/copy.rs | 2 +- native/core/src/execution/operators/filter.rs | 574 --------------------- native/core/src/execution/operators/mod.rs | 3 - native/core/src/execution/operators/scan.rs | 11 +- native/core/src/execution/planner.rs | 80 +-- native/proto/src/proto/operator.proto | 1 - .../org/apache/comet/serde/QueryPlanSerde.scala | 17 - 9 files changed, 27 insertions(+), 777 deletions(-) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 92b7da8e5..5bd62a890 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -119,7 +119,3 @@ harness = false [[bench]] name = "parquet_decode" harness = false - -[[bench]] -name = "filter" -harness = false diff --git a/native/core/benches/filter.rs b/native/core/benches/filter.rs deleted file mode 100644 index 82fa4aac6..000000000 --- a/native/core/benches/filter.rs +++ /dev/null @@ -1,112 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; - -use arrow::array::builder::{BooleanBuilder, Int32Builder, StringBuilder}; -use arrow::array::{ArrayRef, RecordBatch}; -use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, Field, Schema}; -use comet::execution::operators::comet_filter_record_batch; -use criterion::{criterion_group, criterion_main, Criterion}; -use std::hint::black_box; -use std::sync::Arc; -use std::time::Duration; - -fn criterion_benchmark(c: &mut Criterion) { - let mut group = c.benchmark_group("filter"); - - let num_rows = 8192; - let num_int_cols = 4; - let num_string_cols = 4; - - let batch = create_record_batch(num_rows, num_int_cols, num_string_cols); - - // create some different predicates - let mut predicate_select_few = BooleanBuilder::with_capacity(num_rows); - let mut predicate_select_many = BooleanBuilder::with_capacity(num_rows); - let mut predicate_select_all = BooleanBuilder::with_capacity(num_rows); - for i in 0..num_rows { - predicate_select_few.append_value(i % 10 == 0); - predicate_select_many.append_value(i % 10 > 0); - predicate_select_all.append_value(true); - } - let predicate_select_few = predicate_select_few.finish(); - let predicate_select_many = predicate_select_many.finish(); - let predicate_select_all = predicate_select_all.finish(); - - // baseline uses Arrow's filter_record_batch method - group.bench_function("arrow_filter_record_batch - few rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) - }); - group.bench_function("arrow_filter_record_batch - many rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) - }); - group.bench_function("arrow_filter_record_batch - all rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) - }); - - group.bench_function("comet_filter_record_batch - few rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) - }); - group.bench_function("comet_filter_record_batch - many rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) - }); - group.bench_function("comet_filter_record_batch - all rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) - }); - - group.finish(); -} - -fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) -> RecordBatch { - let mut int32_builder = Int32Builder::with_capacity(num_rows); - let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32); - for i in 0..num_rows { - int32_builder.append_value(i as i32); - string_builder.append_value(format!("this is string #{i}")); - } - let int32_array = Arc::new(int32_builder.finish()); - let string_array = Arc::new(string_builder.finish()); - - let mut fields = vec![]; - let mut columns: Vec<ArrayRef> = vec![]; - let mut i = 0; - for _ in 0..num_int_cols { - fields.push(Field::new(format!("c{i}"), DataType::Int32, false)); - columns.push(int32_array.clone()); // note this is just copying a reference to the array - i += 1; - } - for _ in 0..num_string_cols { - fields.push(Field::new(format!("c{i}"), DataType::Utf8, false)); - columns.push(string_array.clone()); // note this is just copying a reference to the array - i += 1; - } - let schema = Schema::new(fields); - RecordBatch::try_new(Arc::new(schema), columns).unwrap() -} - -fn config() -> Criterion { - Criterion::default() - .measurement_time(Duration::from_millis(500)) - .warm_up_time(Duration::from_millis(500)) -} - -criterion_group! { - name = benches; - config = config(); - targets = criterion_benchmark -} -criterion_main!(benches); diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index f1e87c2e0..950be36b6 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -245,7 +245,7 @@ impl RecordBatchStream for CopyStream { } /// Copy an Arrow Array -fn copy_array(array: &dyn Array) -> ArrayRef { +pub(crate) fn copy_array(array: &dyn Array) -> ArrayRef { let capacity = array.len(); let data = array.to_data(); diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs deleted file mode 100644 index 272bbd4d8..000000000 --- a/native/core/src/execution/operators/filter.rs +++ /dev/null @@ -1,574 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::any::Any; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{ready, Context, Poll}; - -use datafusion::physical_plan::{ - metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, -}; - -use arrow::array::{ - make_array, Array, ArrayRef, BooleanArray, MutableArrayData, RecordBatchOptions, -}; -use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, SchemaRef}; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; -use datafusion::common::cast::as_boolean_array; -use datafusion::common::stats::Precision; -use datafusion::common::{ - internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, -}; -use datafusion::execution::TaskContext; -use datafusion::logical_expr::Operator; -use datafusion::physical_expr::equivalence::ProjectionMapping; -use datafusion::physical_expr::expressions::BinaryExpr; -use datafusion::physical_expr::intervals::utils::check_support; -use datafusion::physical_expr::utils::collect_columns; -use datafusion::physical_expr::{ - analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, - PhysicalExpr, -}; -use datafusion::physical_plan::common::can_project; -use datafusion::physical_plan::execution_plan::CardinalityEffect; -use futures::stream::{Stream, StreamExt}; -use log::trace; - -/// This is a copy of DataFusion's FilterExec with one modification to ensure that input -/// batches are never passed through unchanged. The changes are between the comments -/// `BEGIN Comet change` and `END Comet change`. -#[derive(Debug, Clone)] -pub struct FilterExec { - /// The expression to filter on. This expression must evaluate to a boolean value. - predicate: Arc<dyn PhysicalExpr>, - /// The input plan - input: Arc<dyn ExecutionPlan>, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, - /// Selectivity for statistics. 0 = no rows, 100 = all rows - default_selectivity: u8, - /// Properties equivalence properties, partitioning, etc. - cache: PlanProperties, - /// The projection indices of the columns in the output schema of join - projection: Option<Vec<usize>>, -} - -impl FilterExec { - /// Create a FilterExec on an input - pub fn try_new( - predicate: Arc<dyn PhysicalExpr>, - input: Arc<dyn ExecutionPlan>, - ) -> Result<Self> { - match predicate.data_type(input.schema().as_ref())? { - DataType::Boolean => { - let default_selectivity = 20; - let cache = - Self::compute_properties(&input, &predicate, default_selectivity, None)?; - Ok(Self { - predicate, - input: Arc::clone(&input), - metrics: ExecutionPlanMetricsSet::new(), - default_selectivity, - cache, - projection: None, - }) - } - other => { - plan_err!("Filter predicate must return BOOLEAN values, got {other:?}") - } - } - } - - pub fn with_default_selectivity( - mut self, - default_selectivity: u8, - ) -> Result<Self, DataFusionError> { - if default_selectivity > 100 { - return plan_err!( - "Default filter selectivity value needs to be less than or equal to 100" - ); - } - self.default_selectivity = default_selectivity; - Ok(self) - } - - /// Return new instance of [FilterExec] with the given projection. - pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> { - // Check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; - - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; - - let cache = Self::compute_properties( - &self.input, - &self.predicate, - self.default_selectivity, - projection.as_ref(), - )?; - Ok(Self { - predicate: Arc::clone(&self.predicate), - input: Arc::clone(&self.input), - metrics: self.metrics.clone(), - default_selectivity: self.default_selectivity, - cache, - projection, - }) - } - - /// The expression to filter on. This expression must evaluate to a boolean value. - pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> { - &self.predicate - } - - /// The input plan - pub fn input(&self) -> &Arc<dyn ExecutionPlan> { - &self.input - } - - /// The default selectivity - pub fn default_selectivity(&self) -> u8 { - self.default_selectivity - } - - /// Projection - pub fn projection(&self) -> Option<&Vec<usize>> { - self.projection.as_ref() - } - - /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. - fn statistics_helper( - input: &Arc<dyn ExecutionPlan>, - predicate: &Arc<dyn PhysicalExpr>, - default_selectivity: u8, - ) -> Result<Statistics> { - let input_stats = input.partition_statistics(None)?; - let schema = input.schema(); - if !check_support(predicate, &schema) { - let selectivity = default_selectivity as f64 / 100.0; - let mut stats = input_stats.to_inexact(); - stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); - stats.total_byte_size = stats - .total_byte_size - .with_estimated_selectivity(selectivity); - return Ok(stats); - } - - let num_rows = input_stats.num_rows; - let total_byte_size = input_stats.total_byte_size; - let input_analysis_ctx = - AnalysisContext::try_from_statistics(&input.schema(), &input_stats.column_statistics)?; - - let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?; - - // Estimate (inexact) selectivity of predicate - let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); - let num_rows = num_rows.with_estimated_selectivity(selectivity); - let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); - - let column_statistics = - collect_new_statistics(&input_stats.column_statistics, analysis_ctx.boundaries); - Ok(Statistics { - num_rows, - total_byte_size, - column_statistics, - }) - } - - fn extend_constants( - input: &Arc<dyn ExecutionPlan>, - predicate: &Arc<dyn PhysicalExpr>, - ) -> Vec<ConstExpr> { - let mut res_constants = Vec::new(); - let input_eqs = input.equivalence_properties(); - - let conjunctions = split_conjunction(predicate); - for conjunction in conjunctions { - if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>() { - if binary.op() == &Operator::Eq { - // Filter evaluates to single value for all partitions - if input_eqs.is_expr_constant(binary.left()).is_some() { - let across = input_eqs - .is_expr_constant(binary.right()) - .unwrap_or_default(); - res_constants.push(ConstExpr::new(Arc::clone(binary.right()), across)); - } else if input_eqs.is_expr_constant(binary.right()).is_some() { - let across = input_eqs - .is_expr_constant(binary.left()) - .unwrap_or_default(); - res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across)); - } - } - } - } - res_constants - } - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties( - input: &Arc<dyn ExecutionPlan>, - predicate: &Arc<dyn PhysicalExpr>, - default_selectivity: u8, - projection: Option<&Vec<usize>>, - ) -> Result<PlanProperties> { - // Combine the equal predicates with the input equivalence properties - // to construct the equivalence properties: - let stats = Self::statistics_helper(input, predicate, default_selectivity)?; - let mut eq_properties = input.equivalence_properties().clone(); - let (equal_pairs, _) = collect_columns_from_predicate(predicate); - for (lhs, rhs) in equal_pairs { - eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? - } - // Add the columns that have only one viable value (singleton) after - // filtering to constants. - let constants = collect_columns(predicate) - .into_iter() - .filter(|column| stats.column_statistics[column.index()].is_singleton()) - .map(|column| { - let value = stats.column_statistics[column.index()] - .min_value - .get_value(); - let expr = Arc::new(column) as _; - ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned())) - }); - // This is for statistics - eq_properties.add_constants(constants)?; - // This is for logical constant (for example: a = '1', then a could be marked as a constant) - // to do: how to deal with a multiple situation to represent = (for example, c1 between 0 and 0) - eq_properties.add_constants(Self::extend_constants(input, predicate))?; - - let mut output_partitioning = input.output_partitioning().clone(); - // If contains projection, update the PlanProperties. - if let Some(projection) = projection { - let schema = eq_properties.schema(); - let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; - let out_schema = project_schema(schema, Some(projection))?; - output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); - eq_properties = eq_properties.project(&projection_mapping, out_schema); - } - - Ok(PlanProperties::new( - eq_properties, - output_partitioning, - input.pipeline_behavior(), - input.boundedness(), - )) - } -} - -impl DisplayAs for FilterExec { - fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let display_projections = if let Some(projection) = self.projection.as_ref() { - format!( - ", projection=[{}]", - projection - .iter() - .map(|index| format!( - "{}@{}", - self.input.schema().fields().get(*index).unwrap().name(), - index - )) - .collect::<Vec<_>>() - .join(", ") - ) - } else { - "".to_string() - }; - write!( - f, - "CometFilterExec: {}{}", - self.predicate, display_projections - ) - } - DisplayFormatType::TreeRender => unimplemented!(), - } - } -} - -impl ExecutionPlan for FilterExec { - fn name(&self) -> &'static str { - "CometFilterExec" - } - - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.cache - } - - fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { - vec![&self.input] - } - - fn maintains_input_order(&self) -> Vec<bool> { - // Tell optimizer this operator doesn't reorder its input - vec![true] - } - - fn with_new_children( - self: Arc<Self>, - mut children: Vec<Arc<dyn ExecutionPlan>>, - ) -> Result<Arc<dyn ExecutionPlan>> { - FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0)) - .and_then(|e| { - let selectivity = e.default_selectivity(); - e.with_default_selectivity(selectivity) - }) - .and_then(|e| e.with_projection(self.projection().cloned())) - .map(|e| Arc::new(e) as _) - } - - fn execute( - &self, - partition: usize, - context: Arc<TaskContext>, - ) -> Result<SendableRecordBatchStream> { - trace!( - "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", - partition, - context.session_id(), - context.task_id() - ); - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - Ok(Box::pin(FilterExecStream { - schema: self.schema(), - predicate: Arc::clone(&self.predicate), - input: self.input.execute(partition, context)?, - baseline_metrics, - projection: self.projection.clone(), - })) - } - - fn metrics(&self) -> Option<MetricsSet> { - Some(self.metrics.clone_inner()) - } - - /// The output statistics of a filtering operation can be estimated if the - /// predicate's selectivity value can be determined for the incoming data. - fn statistics(&self) -> Result<Statistics> { - let stats = - Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)?; - Ok(stats.project(self.projection.as_ref())) - } - - fn cardinality_effect(&self) -> CardinalityEffect { - CardinalityEffect::LowerEqual - } -} - -/// This function ensures that all bounds in the `ExprBoundaries` vector are -/// converted to closed bounds. If a lower/upper bound is initially open, it -/// is adjusted by using the next/previous value for its data type to convert -/// it into a closed bound. -fn collect_new_statistics( - input_column_stats: &[ColumnStatistics], - analysis_boundaries: Vec<ExprBoundaries>, -) -> Vec<ColumnStatistics> { - analysis_boundaries - .into_iter() - .enumerate() - .map( - |( - idx, - ExprBoundaries { - interval, - distinct_count, - .. - }, - )| { - let Some(interval) = interval else { - // If the interval is `None`, we can say that there are no rows: - return ColumnStatistics { - null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), - distinct_count: Precision::Exact(0), - }; - }; - let (lower, upper) = interval.into_bounds(); - let (min_value, max_value) = if lower.eq(&upper) { - (Precision::Exact(lower), Precision::Exact(upper)) - } else { - (Precision::Inexact(lower), Precision::Inexact(upper)) - }; - ColumnStatistics { - null_count: input_column_stats[idx].null_count.to_inexact(), - max_value, - min_value, - sum_value: Precision::Absent, - distinct_count: distinct_count.to_inexact(), - } - }, - ) - .collect() -} - -/// The FilterExec streams wraps the input iterator and applies the predicate expression to -/// determine which rows to include in its output batches -struct FilterExecStream { - /// Output schema after the projection - schema: SchemaRef, - /// The expression to filter on. This expression must evaluate to a boolean value. - predicate: Arc<dyn PhysicalExpr>, - /// The input partition to filter. - input: SendableRecordBatchStream, - /// Runtime metrics recording - baseline_metrics: BaselineMetrics, - /// The projection indices of the columns in the input schema - projection: Option<Vec<usize>>, -} - -fn filter_and_project( - batch: &RecordBatch, - predicate: &Arc<dyn PhysicalExpr>, - projection: Option<&Vec<usize>>, - output_schema: &SchemaRef, -) -> Result<RecordBatch> { - predicate - .evaluate(batch) - .and_then(|v| v.into_array(batch.num_rows())) - .and_then(|array| { - Ok(match (as_boolean_array(&array), projection) { - // Apply filter array to record batch - (Ok(filter_array), None) => comet_filter_record_batch(batch, filter_array)?, - (Ok(filter_array), Some(projection)) => { - let projected_columns = projection - .iter() - .map(|i| Arc::clone(batch.column(*i))) - .collect(); - let projected_batch = - RecordBatch::try_new(Arc::clone(output_schema), projected_columns)?; - comet_filter_record_batch(&projected_batch, filter_array)? - } - (Err(_), _) => { - return internal_err!("Cannot create filter_array from non-boolean predicates"); - } - }) - }) -} - -// BEGIN Comet changes -// `FilterExec` could modify input batch or return input batch without change. Instead of always -// adding `CopyExec` on top of it, we only copy input batch for the special case. -pub fn comet_filter_record_batch( - record_batch: &RecordBatch, - predicate: &BooleanArray, -) -> std::result::Result<RecordBatch, ArrowError> { - if predicate.true_count() == record_batch.num_rows() { - // special case where we just make an exact copy - let arrays: Vec<ArrayRef> = record_batch - .columns() - .iter() - .map(|array| { - let capacity = array.len(); - let data = array.to_data(); - let mut mutable = MutableArrayData::new(vec![&data], false, capacity); - mutable.extend(0, 0, capacity); - make_array(mutable.freeze()) - }) - .collect(); - let options = RecordBatchOptions::new().with_row_count(Some(record_batch.num_rows())); - RecordBatch::try_new_with_options(Arc::clone(&record_batch.schema()), arrays, &options) - } else { - filter_record_batch(record_batch, predicate) - } -} -// END Comet changes - -impl Stream for FilterExecStream { - type Item = Result<RecordBatch>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let poll; - loop { - match ready!(self.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = filter_and_project( - &batch, - &self.predicate, - self.projection.as_ref(), - &self.schema, - )?; - timer.done(); - // Skip entirely filtered batches - if filtered_batch.num_rows() == 0 { - continue; - } - poll = Poll::Ready(Some(Ok(filtered_batch))); - break; - } - value => { - poll = Poll::Ready(value); - break; - } - } - } - self.baseline_metrics.record_poll(poll) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - // Same number of record batches - self.input.size_hint() - } -} - -impl RecordBatchStream for FilterExecStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - -/// Return the equals Column-Pairs and Non-equals Column-Pairs -fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual<'_> { - let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new(); - let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new(); - - let predicates = split_conjunction(predicate); - predicates.into_iter().for_each(|p| { - if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() { - match binary.op() { - Operator::Eq => eq_predicate_columns.push((binary.left(), binary.right())), - Operator::NotEq => ne_predicate_columns.push((binary.left(), binary.right())), - _ => {} - } - } - }); - - (eq_predicate_columns, ne_predicate_columns) -} - -/// Pair of `Arc<dyn PhysicalExpr>`s -pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>); - -/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates -pub type EqualAndNonEqual<'a> = (Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>); diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index 4e15e4341..c8cfebd45 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -22,14 +22,11 @@ use std::fmt::Debug; use jni::objects::GlobalRef; pub use copy::*; -pub use filter::comet_filter_record_batch; -pub use filter::FilterExec; pub use scan::*; mod copy; mod expand; pub use expand::ExpandExec; -mod filter; mod scan; /// Error returned during executing operators. diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index a842efaa3..6d252743b 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::operators::copy_array; use crate::{ errors::CometError, execution::{ @@ -276,8 +277,16 @@ impl ScanExec { let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?; // TODO: validate array input data + // array_data.validate_full()?; - inputs.push(make_array(array_data)); + let array = make_array(array_data); + + // we copy the array to that we don't have to worry about potential memory + // corruption issues later on if underlying buffers are reused or freed + // TODO optimize this so that we only do this for Parquet inputs! + let array = copy_array(&array); + + inputs.push(array); // Drop the Arcs to avoid memory leak unsafe { diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0b0b7668f..71bb0c748 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -18,7 +18,6 @@ //! Converts Spark physical plan to DataFusion physical plan use crate::execution::operators::CopyMode; -use crate::execution::operators::FilterExec as CometFilterExec; use crate::{ errors::ExpressionError, execution::{ @@ -93,7 +92,7 @@ use arrow::array::{ use arrow::buffer::BooleanBuffer; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; -use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec; +use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::GlobalLimitExec; use datafusion_comet_proto::spark_operator::SparkFilePartition; use datafusion_comet_proto::{ @@ -1178,25 +1177,17 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - let filter: Arc<dyn ExecutionPlan> = - match (filter.wrap_child_in_copy_exec, filter.use_datafusion_filter) { - (true, true) => Arc::new(DataFusionFilterExec::try_new( - predicate, - Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), - )?), - (true, false) => Arc::new(CometFilterExec::try_new( - predicate, - Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), - )?), - (false, true) => Arc::new(DataFusionFilterExec::try_new( - predicate, - Arc::clone(&child.native_plan), - )?), - (false, false) => Arc::new(CometFilterExec::try_new( - predicate, - Arc::clone(&child.native_plan), - )?), - }; + let filter: Arc<dyn ExecutionPlan> = if filter.wrap_child_in_copy_exec { + Arc::new(FilterExec::try_new( + predicate, + Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), + )?) + } else { + Arc::new(FilterExec::try_new( + predicate, + Arc::clone(&child.native_plan), + )?) + }; Ok(( scans, @@ -1564,14 +1555,7 @@ impl PhysicalPlanner { // the data corruption. Note that we only need to copy the input batch // if the child operator is `ScanExec`, because other operators after `ScanExec` // will create new arrays for the output batch. - let input = if can_reuse_input_batch(&child.native_plan) { - Arc::new(CopyExec::new( - Arc::clone(&child.native_plan), - CopyMode::UnpackOrDeepCopy, - )) - } else { - Arc::clone(&child.native_plan) - }; + let input = Arc::clone(&child.native_plan); let expand = Arc::new(ExpandExec::new(projections, input, schema)); Ok(( scans, @@ -1901,14 +1885,9 @@ impl PhysicalPlanner { )) } - /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays - /// and make a deep copy of other arrays if the plan re-uses batches. + /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays. fn wrap_in_copy_exec(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> { - if can_reuse_input_batch(&plan) { - Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy)) - } else { - Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) - } + Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) } /// Create a DataFusion physical aggregate expression from Spark physical aggregate expression @@ -2609,32 +2588,6 @@ impl From<ExpressionError> for DataFusionError { } } -/// Returns true if given operator can return input array as output array without -/// modification. This is used to determine if we need to copy the input batch to avoid -/// data corruption from reusing the input batch. -fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool { - if op.as_any().is::<ScanExec>() { - // native_comet and native_iceberg_compat scan reuse mutable buffers - // so we need to make copies of the batches - // for now, we also copy even if the source is not a Parquet scan, but - // we will optimize this later - true - } else if op.as_any().is::<CopyExec>() { - let copy_exec = op.as_any().downcast_ref::<CopyExec>().unwrap(); - copy_exec.mode() == &CopyMode::UnpackOrClone && can_reuse_input_batch(copy_exec.input()) - } else if op.as_any().is::<CometFilterExec>() { - // CometFilterExec guarantees that all arrays have been copied - false - } else { - for child in op.children() { - if can_reuse_input_batch(child) { - return true; - } - } - false - } -} - /// Collects the indices of the columns in the input schema that are used in the expression /// and returns them as a pair of vectors, one for the left side and one for the right side. fn expr_to_columns( @@ -3109,7 +3062,6 @@ mod tests { children: vec![child_op], op_struct: Some(OpStruct::Filter(spark_operator::Filter { predicate: Some(expr), - use_datafusion_filter: false, wrap_child_in_copy_exec: false, })), } @@ -3123,7 +3075,7 @@ mod tests { let (_scans, filter_exec) = planner.create_plan(&op, &mut vec![], 1).unwrap(); - assert_eq!("CometFilterExec", filter_exec.native_plan.name()); + assert_eq!("FilterExec", filter_exec.native_plan.name()); assert_eq!(1, filter_exec.children.len()); assert_eq!(0, filter_exec.additional_native_plans.len()); } diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index ac34def81..5cb332ef0 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -109,7 +109,6 @@ message Projection { message Filter { spark.spark_expression.Expr predicate = 1; - bool use_datafusion_filter = 2; // Some expressions don't support dictionary arrays, so may need to wrap the child in a CopyExec bool wrap_child_in_copy_exec = 3; } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f0b4059b9..c646ceb69 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1788,22 +1788,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val cond = exprToProto(condition, child.output) if (cond.isDefined && childOp.nonEmpty) { - // We need to determine whether to use DataFusion's FilterExec or Comet's - // FilterExec. The difference is that DataFusion's implementation will sometimes pass - // batches through whereas the Comet implementation guarantees that a copy is always - // made, which is critical when using `native_comet` scans due to buffer re-use - - // TODO this could be optimized more to stop walking the tree on hitting - // certain operators such as join or aggregate which will copy batches - def containsNativeCometScan(plan: SparkPlan): Boolean = { - plan match { - case w: CometScanWrapper => containsNativeCometScan(w.originalPlan) - case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET - case _: CometNativeScanExec => false - case _ => plan.children.exists(containsNativeCometScan) - } - } - // Some native expressions do not support operating on dictionary-encoded arrays, so // wrap the child in a CopyExec to unpack dictionaries first. def wrapChildInCopyExec(condition: Expression): Boolean = { @@ -1816,7 +1800,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val filterBuilder = OperatorOuterClass.Filter .newBuilder() .setPredicate(cond.get) - .setUseDatafusionFilter(!containsNativeCometScan(op)) .setWrapChildInCopyExec(wrapChildInCopyExec(condition)) Some(builder.setFilter(filterBuilder).build()) } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org