This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c1fc06c4 fix: remove code duplication in native_datafusion and 
native_iceberg_compat implementations (#1443)
3c1fc06c4 is described below

commit 3c1fc06c465f668800d747f3feb6295340c3329b
Author: Parth Chandra <[email protected]>
AuthorDate: Wed Mar 19 09:42:38 2025 -0700

    fix: remove code duplication in native_datafusion and native_iceberg_compat 
implementations (#1443)
    
    * fix: remove code duplication in native_datafusion and 
native_iceberg_compat implementations
---
 native/core/src/execution/planner.rs    | 174 +++++++++++++-------------------
 native/core/src/parquet/mod.rs          |  91 +++++++----------
 native/core/src/parquet/parquet_exec.rs | 139 +++++++++++++++++++++++++
 3 files changed, 248 insertions(+), 156 deletions(-)

diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 87c8fc4b2..4c3a0fde9 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -72,19 +72,17 @@ use datafusion::{
 };
 use datafusion_comet_spark_expr::{create_comet_physical_fun, 
create_negate_expr};
 
+use crate::execution::operators::ExecutionError::GeneralError;
 use crate::execution::shuffle::CompressionCodec;
 use crate::execution::spark_plan::SparkPlan;
-use crate::parquet::parquet_support::{prepare_object_store, 
SparkParquetOptions};
-use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
-use datafusion::common::config::TableParquetOptions;
+use crate::parquet::parquet_exec::init_datasource_exec;
+use crate::parquet::parquet_support::prepare_object_store;
 use datafusion::common::scalar::ScalarStructBuilder;
 use datafusion::common::{
     tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeRewriter},
     JoinType as DFJoinType, ScalarValue,
 };
 use datafusion::datasource::listing::PartitionedFile;
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
-use datafusion::datasource::source::DataSourceExec;
 use datafusion::functions_nested::array_has::ArrayHas;
 use 
datafusion::logical_expr::type_coercion::other::get_coerce_type_for_case_expression;
 use datafusion::logical_expr::{
@@ -94,8 +92,10 @@ use datafusion::logical_expr::{
 use datafusion::physical_expr::expressions::{Literal, StatsType};
 use datafusion::physical_expr::window::WindowExpr;
 use datafusion::physical_expr::LexOrdering;
+
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec;
+use datafusion_comet_proto::spark_operator::SparkFilePartition;
 use datafusion_comet_proto::{
     spark_expression::{
         self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, 
literal::Value, AggExpr,
@@ -181,6 +181,60 @@ impl PhysicalPlanner {
         }
     }
 
+    /// get DataFusion PartitionedFiles from a Spark FilePartition
+    fn get_partitioned_files(
+        &self,
+        partition: &SparkFilePartition,
+    ) -> Result<Vec<PartitionedFile>, ExecutionError> {
+        let mut files = Vec::with_capacity(partition.partitioned_file.len());
+        partition.partitioned_file.iter().try_for_each(|file| {
+            assert!(file.start + file.length <= file.file_size);
+
+            let mut partitioned_file = PartitionedFile::new_with_range(
+                String::new(), // Dummy file path.
+                file.file_size as u64,
+                file.start,
+                file.start + file.length,
+            );
+
+            // Spark sends the path over as URL-encoded, parse that first.
+            let url =
+                Url::parse(file.file_path.as_ref()).map_err(|e| 
GeneralError(e.to_string()))?;
+            // Convert that to a Path object to use in the PartitionedFile.
+            let path = Path::from_url_path(url.path()).map_err(|e| 
GeneralError(e.to_string()))?;
+            partitioned_file.object_meta.location = path;
+
+            // Process partition values
+            // Create an empty input schema for partition values because they 
are all literals.
+            let empty_schema = Arc::new(Schema::empty());
+            let partition_values: Result<Vec<_>, _> = file
+                .partition_values
+                .iter()
+                .map(|partition_value| {
+                    let literal =
+                        self.create_expr(partition_value, 
Arc::<Schema>::clone(&empty_schema))?;
+                    literal
+                        .as_any()
+                        .downcast_ref::<DataFusionLiteral>()
+                        .ok_or_else(|| {
+                            ExecutionError::GeneralError(
+                                "Expected literal of partition 
value".to_string(),
+                            )
+                        })
+                        .map(|literal| literal.value().clone())
+                })
+                .collect();
+            let partition_values = partition_values?;
+
+            partitioned_file.partition_values = partition_values;
+
+            files.push(partitioned_file);
+            Ok::<(), ExecutionError>(())
+        })?;
+
+        Ok(files)
+    }
+
     /// Create a DataFusion physical expression from Spark physical expression
     fn create_expr(
         &self,
@@ -1195,19 +1249,6 @@ impl PhysicalPlanner {
                     .map(|expr| self.create_expr(expr, 
Arc::clone(&required_schema)))
                     .collect();
 
-                // Create a conjunctive form of the vector because 
ParquetExecBuilder takes
-                // a single expression
-                let data_filters = data_filters?;
-                let cnf_data_filters = 
data_filters.clone().into_iter().reduce(|left, right| {
-                    Arc::new(BinaryExpr::new(
-                        left,
-                        datafusion::logical_expr::Operator::And,
-                        right,
-                    ))
-                });
-
-                // By default, local FS object store registered
-                // if `hdfs` feature enabled then HDFS file object store 
registered
                 // Get one file from the list of files
                 let one_file = scan
                     .file_partitions
@@ -1224,53 +1265,7 @@ impl PhysicalPlanner {
                 let mut file_groups: Vec<Vec<PartitionedFile>> =
                     Vec::with_capacity(partition_count);
                 scan.file_partitions.iter().try_for_each(|partition| {
-                    let mut files = 
Vec::with_capacity(partition.partitioned_file.len());
-                    partition.partitioned_file.iter().try_for_each(|file| {
-                        assert!(file.start + file.length <= file.file_size);
-
-                        let mut partitioned_file = 
PartitionedFile::new_with_range(
-                            String::new(), // Dummy file path.
-                            file.file_size as u64,
-                            file.start,
-                            file.start + file.length,
-                        );
-
-                        // Spark sends the path over as URL-encoded, parse 
that first.
-                        let url = Url::parse(file.file_path.as_ref()).unwrap();
-                        // Convert that to a Path object to use in the 
PartitionedFile.
-                        let path = Path::from_url_path(url.path()).unwrap();
-                        partitioned_file.object_meta.location = path;
-
-                        // Process partition values
-                        // Create an empty input schema for partition values 
because they are all literals.
-                        let empty_schema = Arc::new(Schema::empty());
-                        let partition_values: Result<Vec<_>, _> = file
-                            .partition_values
-                            .iter()
-                            .map(|partition_value| {
-                                let literal = self.create_expr(
-                                    partition_value,
-                                    Arc::<Schema>::clone(&empty_schema),
-                                )?;
-                                literal
-                                    .as_any()
-                                    .downcast_ref::<DataFusionLiteral>()
-                                    .ok_or_else(|| {
-                                        ExecutionError::GeneralError(
-                                            "Expected literal of partition 
value".to_string(),
-                                        )
-                                    })
-                                    .map(|literal| literal.value().clone())
-                            })
-                            .collect();
-                        let partition_values = partition_values?;
-
-                        partitioned_file.partition_values = partition_values;
-
-                        files.push(partitioned_file);
-                        Ok::<(), ExecutionError>(())
-                    })?;
-
+                    let files = self.get_partitioned_files(partition)?;
                     file_groups.push(files);
                     Ok::<(), ExecutionError>(())
                 })?;
@@ -1284,47 +1279,20 @@ impl PhysicalPlanner {
                         Field::new(field.name(), field.data_type().clone(), 
field.is_nullable())
                     })
                     .collect_vec();
-
-                let mut table_parquet_options = TableParquetOptions::new();
-                // TODO: Maybe these are configs?
-                table_parquet_options.global.pushdown_filters = true;
-                table_parquet_options.global.reorder_filters = true;
-
-                let mut spark_parquet_options = SparkParquetOptions::new(
-                    EvalMode::Legacy,
-                    scan.session_timezone.as_str(),
-                    false,
-                );
-                spark_parquet_options.allow_cast_unsigned_ints = true;
-
-                let mut parquet_source = 
ParquetSource::new(table_parquet_options)
-                    
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
-                        spark_parquet_options,
-                    )));
-
-                if let Some(filter) = cnf_data_filters {
-                    parquet_source =
-                        
parquet_source.with_predicate(Arc::clone(&data_schema), filter);
-                }
-
-                let mut file_scan_config = FileScanConfig::new(
+                let scan = init_datasource_exec(
+                    required_schema,
+                    Some(data_schema),
+                    Some(partition_schema),
+                    Some(partition_fields),
                     object_store_url,
-                    Arc::clone(&data_schema),
-                    Arc::new(parquet_source),
-                )
-                .with_file_groups(file_groups)
-                .with_table_partition_cols(partition_fields);
-
-                assert_eq!(
-                    projection_vector.len(),
-                    required_schema.fields.len() + 
partition_schema.fields.len()
-                );
-                file_scan_config = 
file_scan_config.with_projection(Some(projection_vector));
-
-                let scan = DataSourceExec::new(Arc::new(file_scan_config));
+                    file_groups,
+                    Some(projection_vector),
+                    Some(data_filters?),
+                    scan.session_timezone.as_str(),
+                )?;
                 Ok((
                     vec![],
-                    Arc::new(SparkPlan::new(spark_plan.plan_id, 
Arc::new(scan), vec![])),
+                    Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
                 ))
             }
             OpStruct::Scan(scan) => {
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 882a9ed6d..17109d31b 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -21,6 +21,7 @@ pub use mutable_vector::*;
 
 #[macro_use]
 pub mod util;
+pub mod parquet_exec;
 pub mod parquet_support;
 pub mod read;
 pub mod schema_adapter;
@@ -46,23 +47,21 @@ use self::util::jni::TypePromotionInfo;
 use crate::execution::operators::ExecutionError;
 use crate::execution::utils::SparkArrowConvert;
 use crate::parquet::data_type::AsBytes;
-use crate::parquet::parquet_support::{prepare_object_store, 
SparkParquetOptions};
-use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
+use crate::parquet::parquet_exec::init_datasource_exec;
+use crate::parquet::parquet_support::prepare_object_store;
 use arrow::array::{Array, RecordBatch};
 use arrow::buffer::{Buffer, MutableBuffer};
-use datafusion::common::config::TableParquetOptions;
 use datafusion::datasource::listing::PartitionedFile;
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
-use datafusion::datasource::source::DataSourceExec;
 use datafusion::execution::{SendableRecordBatchStream, TaskContext};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
-use datafusion_comet_spark_expr::EvalMode;
 use futures::{poll, StreamExt};
 use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, 
JString, ReleaseMode};
 use jni::sys::jstring;
+use object_store::path::Path;
 use read::ColumnReader;
 use util::jni::{convert_column_descriptor, convert_encoding, 
deserialize_schema};
+
 /// Parquet read context maintained across multiple JNI calls.
 struct Context {
     pub column_reader: ColumnReader,
@@ -620,12 +619,21 @@ fn get_batch_context<'a>(handle: jlong) -> Result<&'a mut 
BatchContext, CometErr
     }
 }
 
-/*
-#[inline]
-fn get_batch_reader<'a>(handle: jlong) -> Result<&'a mut 
ParquetRecordBatchReader, CometError> {
-    Ok(&mut get_batch_context(handle)?.batch_reader.unwrap())
+fn get_file_groups_single_file(
+    path: &Path,
+    file_size: u64,
+    start: i64,
+    length: i64,
+) -> Vec<Vec<PartitionedFile>> {
+    let mut partitioned_file = PartitionedFile::new_with_range(
+        String::new(), // Dummy file path. We will override this with our path 
so that url encoding does not occur
+        file_size,
+        start,
+        start + length,
+    );
+    partitioned_file.object_meta.location = (*path).clone();
+    vec![vec![partitioned_file]]
 }
-*/
 
 /// # Safety
 /// This function is inherently unsafe since it deals with raw pointers passed 
from JNI.
@@ -645,65 +653,42 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
             .get_string(&JString::from_raw(file_path))
             .unwrap()
             .into();
-        let batch_stream: Option<SendableRecordBatchStream>;
-        // TODO: (ARROW NATIVE) Use the common global runtime
+
         let runtime = tokio::runtime::Builder::new_multi_thread()
             .enable_all()
             .build()?;
         let session_ctx = SessionContext::new();
+
         let (object_store_url, object_store_path) =
             prepare_object_store(session_ctx.runtime_env(), path.clone())?;
 
-        // EXPERIMENTAL - BEGIN
-        // TODO: (ARROW NATIVE) - Remove code duplication between this and POC 
1
-        // copy the input on-heap buffer to native
         let required_schema_array = JByteArray::from_raw(required_schema);
         let required_schema_buffer = 
env.convert_byte_array(&required_schema_array)?;
-        let required_schema_arrow = 
deserialize_schema(required_schema_buffer.as_bytes())?;
-        let mut partitioned_file = PartitionedFile::new_with_range(
-            String::new(), // Dummy file path. We will override this with our 
path so that url encoding does not occur
-            file_size as u64,
-            start,
-            start + length,
-        );
-        partitioned_file.object_meta.location = object_store_path;
+        let required_schema = 
Arc::new(deserialize_schema(required_schema_buffer.as_bytes())?);
+
+        let file_groups =
+            get_file_groups_single_file(&object_store_path, file_size as u64, 
start, length);
+
         let session_timezone: String = env
             .get_string(&JString::from_raw(session_timezone))
             .unwrap()
             .into();
 
-        let mut spark_parquet_options =
-            SparkParquetOptions::new(EvalMode::Legacy, 
session_timezone.as_str(), false);
-        spark_parquet_options.allow_cast_unsigned_ints = true;
-
-        let mut table_parquet_options = TableParquetOptions::new();
-        // TODO: Maybe these are configs?
-        table_parquet_options.global.pushdown_filters = true;
-        table_parquet_options.global.reorder_filters = true;
-
-        let parquet_source = 
ParquetSource::new(table_parquet_options).with_schema_adapter_factory(
-            Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options)),
-        );
-
-        // We build the file scan config with the *required* schema so that 
the reader knows
-        // the output schema we want
-        let file_scan_config = FileScanConfig::new(object_store_url, 
Arc::new(required_schema_arrow), Arc::new(parquet_source))
-            .with_file(partitioned_file)
-            // TODO: (ARROW NATIVE) - do partition columns in native
-            //   - will need partition schema and partition values to do so
-            // .with_table_partition_cols(partition_fields)
-            ;
-
-        //TODO: (ARROW NATIVE) - predicate pushdown??
-        // builder = builder.with_predicate(filter);
-
-        let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config)));
+        let scan = init_datasource_exec(
+            required_schema,
+            None,
+            None,
+            None,
+            object_store_url,
+            file_groups,
+            None,
+            None,
+            session_timezone.as_str(),
+        )?;
 
         let ctx = TaskContext::default();
         let partition_index: usize = 0;
-        batch_stream = Some(scan.execute(partition_index, Arc::new(ctx))?);
-
-        // EXPERIMENTAL - END
+        let batch_stream = Some(scan.execute(partition_index, Arc::new(ctx))?);
 
         let ctx = BatchContext {
             runtime,
diff --git a/native/core/src/parquet/parquet_exec.rs 
b/native/core/src/parquet/parquet_exec.rs
new file mode 100644
index 000000000..85a3d023c
--- /dev/null
+++ b/native/core/src/parquet/parquet_exec.rs
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::operators::ExecutionError;
+use crate::parquet::parquet_support::SparkParquetOptions;
+use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
+use arrow::datatypes::{Field, SchemaRef};
+use datafusion::config::TableParquetOptions;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfig, FileSource, 
ParquetSource};
+use datafusion::datasource::source::DataSourceExec;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::physical_expr::expressions::BinaryExpr;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion_comet_spark_expr::EvalMode;
+use itertools::Itertools;
+use std::sync::Arc;
+
+/// Initializes a DataSourceExec plan with a ParquetSource. This may be used 
by either the
+/// `native_datafusion` scan or the `native_iceberg_compat` scan.
+///
+///   `required_schema`: Schema to be projected by the scan.
+///
+///   `data_schema`: Schema of the underlying data. It is optional and, if 
provided, is used
+/// instead of `required_schema` to initialize the file scan
+///
+///   `partition_schema` and `partition_fields` are optional. If 
`partition_schema` is specified,
+/// then `partition_fields` must also be specified
+///
+///   `object_store_url`: Url to read data from
+///
+///   `file_groups`: A collection of groups of `PartitionedFiles` that are to 
be read by the scan
+///
+///   `projection_vector`: A vector of the indexes in the schema of the fields 
to be projected
+///
+///   `data_filters`: Any predicate that must be applied to the data returned 
by the scan. If
+/// specified, then `data_schema` must also be specified.
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn init_datasource_exec(
+    required_schema: SchemaRef,
+    data_schema: Option<SchemaRef>,
+    partition_schema: Option<SchemaRef>,
+    partition_fields: Option<Vec<Field>>,
+    object_store_url: ObjectStoreUrl,
+    file_groups: Vec<Vec<PartitionedFile>>,
+    projection_vector: Option<Vec<usize>>,
+    data_filters: Option<Vec<Arc<dyn PhysicalExpr>>>,
+    session_timezone: &str,
+) -> Result<Arc<DataSourceExec>, ExecutionError> {
+    let (table_parquet_options, spark_parquet_options) = 
get_options(session_timezone);
+    let mut parquet_source = 
ParquetSource::new(table_parquet_options).with_schema_adapter_factory(
+        Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options)),
+    );
+    // Create a conjunctive form of the vector because ParquetExecBuilder takes
+    // a single expression
+    if let Some(data_filters) = data_filters {
+        let cnf_data_filters = data_filters.clone().into_iter().reduce(|left, 
right| {
+            Arc::new(BinaryExpr::new(
+                left,
+                datafusion::logical_expr::Operator::And,
+                right,
+            ))
+        });
+
+        if let (Some(filter), Some(data_schema)) = (cnf_data_filters, 
&data_schema) {
+            parquet_source = 
parquet_source.with_predicate(Arc::clone(data_schema), filter);
+        }
+    }
+    let file_scan_config = match (data_schema, projection_vector, 
partition_fields) {
+        (Some(data_schema), Some(projection_vector), Some(partition_fields)) 
=> get_file_config(
+            data_schema,
+            partition_schema,
+            file_groups,
+            object_store_url,
+            Arc::new(parquet_source),
+        )
+        .with_projection(Some(projection_vector))
+        .with_table_partition_cols(partition_fields),
+        _ => get_file_config(
+            required_schema,
+            partition_schema,
+            file_groups,
+            object_store_url,
+            Arc::new(parquet_source),
+        ),
+    };
+
+    Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
+}
+
+fn get_options(session_timezone: &str) -> (TableParquetOptions, 
SparkParquetOptions) {
+    let mut table_parquet_options = TableParquetOptions::new();
+    // TODO: Maybe these are configs?
+    table_parquet_options.global.pushdown_filters = true;
+    table_parquet_options.global.reorder_filters = true;
+    let mut spark_parquet_options =
+        SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
+    spark_parquet_options.allow_cast_unsigned_ints = true;
+    (table_parquet_options, spark_parquet_options)
+}
+
+fn get_file_config(
+    schema: SchemaRef,
+    partition_schema: Option<SchemaRef>,
+    file_groups: Vec<Vec<PartitionedFile>>,
+    object_store_url: ObjectStoreUrl,
+    file_source: Arc<dyn FileSource>,
+) -> FileScanConfig {
+    match partition_schema {
+        Some(partition_schema) => {
+            let partition_fields: Vec<Field> = partition_schema
+                .fields()
+                .iter()
+                .map(|field| {
+                    Field::new(field.name(), field.data_type().clone(), 
field.is_nullable())
+                })
+                .collect_vec();
+            FileScanConfig::new(object_store_url, Arc::clone(&schema), 
file_source)
+                .with_file_groups(file_groups)
+                .with_table_partition_cols(partition_fields)
+        }
+        _ => FileScanConfig::new(object_store_url, Arc::clone(&schema), 
file_source)
+            .with_file_groups(file_groups),
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to