This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 3c1fc06c4 fix: remove code duplication in native_datafusion and
native_iceberg_compat implementations (#1443)
3c1fc06c4 is described below
commit 3c1fc06c465f668800d747f3feb6295340c3329b
Author: Parth Chandra <[email protected]>
AuthorDate: Wed Mar 19 09:42:38 2025 -0700
fix: remove code duplication in native_datafusion and native_iceberg_compat
implementations (#1443)
* fix: remove code duplication in native_datafusion and
native_iceberg_compat implementations
---
native/core/src/execution/planner.rs | 174 +++++++++++++-------------------
native/core/src/parquet/mod.rs | 91 +++++++----------
native/core/src/parquet/parquet_exec.rs | 139 +++++++++++++++++++++++++
3 files changed, 248 insertions(+), 156 deletions(-)
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 87c8fc4b2..4c3a0fde9 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -72,19 +72,17 @@ use datafusion::{
};
use datafusion_comet_spark_expr::{create_comet_physical_fun,
create_negate_expr};
+use crate::execution::operators::ExecutionError::GeneralError;
use crate::execution::shuffle::CompressionCodec;
use crate::execution::spark_plan::SparkPlan;
-use crate::parquet::parquet_support::{prepare_object_store,
SparkParquetOptions};
-use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
-use datafusion::common::config::TableParquetOptions;
+use crate::parquet::parquet_exec::init_datasource_exec;
+use crate::parquet::parquet_support::prepare_object_store;
use datafusion::common::scalar::ScalarStructBuilder;
use datafusion::common::{
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
TreeNodeRewriter},
JoinType as DFJoinType, ScalarValue,
};
use datafusion::datasource::listing::PartitionedFile;
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
-use datafusion::datasource::source::DataSourceExec;
use datafusion::functions_nested::array_has::ArrayHas;
use
datafusion::logical_expr::type_coercion::other::get_coerce_type_for_case_expression;
use datafusion::logical_expr::{
@@ -94,8 +92,10 @@ use datafusion::logical_expr::{
use datafusion::physical_expr::expressions::{Literal, StatsType};
use datafusion::physical_expr::window::WindowExpr;
use datafusion::physical_expr::LexOrdering;
+
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec;
+use datafusion_comet_proto::spark_operator::SparkFilePartition;
use datafusion_comet_proto::{
spark_expression::{
self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct,
literal::Value, AggExpr,
@@ -181,6 +181,60 @@ impl PhysicalPlanner {
}
}
+ /// get DataFusion PartitionedFiles from a Spark FilePartition
+ fn get_partitioned_files(
+ &self,
+ partition: &SparkFilePartition,
+ ) -> Result<Vec<PartitionedFile>, ExecutionError> {
+ let mut files = Vec::with_capacity(partition.partitioned_file.len());
+ partition.partitioned_file.iter().try_for_each(|file| {
+ assert!(file.start + file.length <= file.file_size);
+
+ let mut partitioned_file = PartitionedFile::new_with_range(
+ String::new(), // Dummy file path.
+ file.file_size as u64,
+ file.start,
+ file.start + file.length,
+ );
+
+ // Spark sends the path over as URL-encoded, parse that first.
+ let url =
+ Url::parse(file.file_path.as_ref()).map_err(|e|
GeneralError(e.to_string()))?;
+ // Convert that to a Path object to use in the PartitionedFile.
+ let path = Path::from_url_path(url.path()).map_err(|e|
GeneralError(e.to_string()))?;
+ partitioned_file.object_meta.location = path;
+
+ // Process partition values
+ // Create an empty input schema for partition values because they
are all literals.
+ let empty_schema = Arc::new(Schema::empty());
+ let partition_values: Result<Vec<_>, _> = file
+ .partition_values
+ .iter()
+ .map(|partition_value| {
+ let literal =
+ self.create_expr(partition_value,
Arc::<Schema>::clone(&empty_schema))?;
+ literal
+ .as_any()
+ .downcast_ref::<DataFusionLiteral>()
+ .ok_or_else(|| {
+ ExecutionError::GeneralError(
+ "Expected literal of partition
value".to_string(),
+ )
+ })
+ .map(|literal| literal.value().clone())
+ })
+ .collect();
+ let partition_values = partition_values?;
+
+ partitioned_file.partition_values = partition_values;
+
+ files.push(partitioned_file);
+ Ok::<(), ExecutionError>(())
+ })?;
+
+ Ok(files)
+ }
+
/// Create a DataFusion physical expression from Spark physical expression
fn create_expr(
&self,
@@ -1195,19 +1249,6 @@ impl PhysicalPlanner {
.map(|expr| self.create_expr(expr,
Arc::clone(&required_schema)))
.collect();
- // Create a conjunctive form of the vector because
ParquetExecBuilder takes
- // a single expression
- let data_filters = data_filters?;
- let cnf_data_filters =
data_filters.clone().into_iter().reduce(|left, right| {
- Arc::new(BinaryExpr::new(
- left,
- datafusion::logical_expr::Operator::And,
- right,
- ))
- });
-
- // By default, local FS object store registered
- // if `hdfs` feature enabled then HDFS file object store
registered
// Get one file from the list of files
let one_file = scan
.file_partitions
@@ -1224,53 +1265,7 @@ impl PhysicalPlanner {
let mut file_groups: Vec<Vec<PartitionedFile>> =
Vec::with_capacity(partition_count);
scan.file_partitions.iter().try_for_each(|partition| {
- let mut files =
Vec::with_capacity(partition.partitioned_file.len());
- partition.partitioned_file.iter().try_for_each(|file| {
- assert!(file.start + file.length <= file.file_size);
-
- let mut partitioned_file =
PartitionedFile::new_with_range(
- String::new(), // Dummy file path.
- file.file_size as u64,
- file.start,
- file.start + file.length,
- );
-
- // Spark sends the path over as URL-encoded, parse
that first.
- let url = Url::parse(file.file_path.as_ref()).unwrap();
- // Convert that to a Path object to use in the
PartitionedFile.
- let path = Path::from_url_path(url.path()).unwrap();
- partitioned_file.object_meta.location = path;
-
- // Process partition values
- // Create an empty input schema for partition values
because they are all literals.
- let empty_schema = Arc::new(Schema::empty());
- let partition_values: Result<Vec<_>, _> = file
- .partition_values
- .iter()
- .map(|partition_value| {
- let literal = self.create_expr(
- partition_value,
- Arc::<Schema>::clone(&empty_schema),
- )?;
- literal
- .as_any()
- .downcast_ref::<DataFusionLiteral>()
- .ok_or_else(|| {
- ExecutionError::GeneralError(
- "Expected literal of partition
value".to_string(),
- )
- })
- .map(|literal| literal.value().clone())
- })
- .collect();
- let partition_values = partition_values?;
-
- partitioned_file.partition_values = partition_values;
-
- files.push(partitioned_file);
- Ok::<(), ExecutionError>(())
- })?;
-
+ let files = self.get_partitioned_files(partition)?;
file_groups.push(files);
Ok::<(), ExecutionError>(())
})?;
@@ -1284,47 +1279,20 @@ impl PhysicalPlanner {
Field::new(field.name(), field.data_type().clone(),
field.is_nullable())
})
.collect_vec();
-
- let mut table_parquet_options = TableParquetOptions::new();
- // TODO: Maybe these are configs?
- table_parquet_options.global.pushdown_filters = true;
- table_parquet_options.global.reorder_filters = true;
-
- let mut spark_parquet_options = SparkParquetOptions::new(
- EvalMode::Legacy,
- scan.session_timezone.as_str(),
- false,
- );
- spark_parquet_options.allow_cast_unsigned_ints = true;
-
- let mut parquet_source =
ParquetSource::new(table_parquet_options)
-
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
- spark_parquet_options,
- )));
-
- if let Some(filter) = cnf_data_filters {
- parquet_source =
-
parquet_source.with_predicate(Arc::clone(&data_schema), filter);
- }
-
- let mut file_scan_config = FileScanConfig::new(
+ let scan = init_datasource_exec(
+ required_schema,
+ Some(data_schema),
+ Some(partition_schema),
+ Some(partition_fields),
object_store_url,
- Arc::clone(&data_schema),
- Arc::new(parquet_source),
- )
- .with_file_groups(file_groups)
- .with_table_partition_cols(partition_fields);
-
- assert_eq!(
- projection_vector.len(),
- required_schema.fields.len() +
partition_schema.fields.len()
- );
- file_scan_config =
file_scan_config.with_projection(Some(projection_vector));
-
- let scan = DataSourceExec::new(Arc::new(file_scan_config));
+ file_groups,
+ Some(projection_vector),
+ Some(data_filters?),
+ scan.session_timezone.as_str(),
+ )?;
Ok((
vec![],
- Arc::new(SparkPlan::new(spark_plan.plan_id,
Arc::new(scan), vec![])),
+ Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
))
}
OpStruct::Scan(scan) => {
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 882a9ed6d..17109d31b 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -21,6 +21,7 @@ pub use mutable_vector::*;
#[macro_use]
pub mod util;
+pub mod parquet_exec;
pub mod parquet_support;
pub mod read;
pub mod schema_adapter;
@@ -46,23 +47,21 @@ use self::util::jni::TypePromotionInfo;
use crate::execution::operators::ExecutionError;
use crate::execution::utils::SparkArrowConvert;
use crate::parquet::data_type::AsBytes;
-use crate::parquet::parquet_support::{prepare_object_store,
SparkParquetOptions};
-use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
+use crate::parquet::parquet_exec::init_datasource_exec;
+use crate::parquet::parquet_support::prepare_object_store;
use arrow::array::{Array, RecordBatch};
use arrow::buffer::{Buffer, MutableBuffer};
-use datafusion::common::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
-use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
-use datafusion_comet_spark_expr::EvalMode;
use futures::{poll, StreamExt};
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray,
JString, ReleaseMode};
use jni::sys::jstring;
+use object_store::path::Path;
use read::ColumnReader;
use util::jni::{convert_column_descriptor, convert_encoding,
deserialize_schema};
+
/// Parquet read context maintained across multiple JNI calls.
struct Context {
pub column_reader: ColumnReader,
@@ -620,12 +619,21 @@ fn get_batch_context<'a>(handle: jlong) -> Result<&'a mut
BatchContext, CometErr
}
}
-/*
-#[inline]
-fn get_batch_reader<'a>(handle: jlong) -> Result<&'a mut
ParquetRecordBatchReader, CometError> {
- Ok(&mut get_batch_context(handle)?.batch_reader.unwrap())
+fn get_file_groups_single_file(
+ path: &Path,
+ file_size: u64,
+ start: i64,
+ length: i64,
+) -> Vec<Vec<PartitionedFile>> {
+ let mut partitioned_file = PartitionedFile::new_with_range(
+ String::new(), // Dummy file path. We will override this with our path
so that url encoding does not occur
+ file_size,
+ start,
+ start + length,
+ );
+ partitioned_file.object_meta.location = (*path).clone();
+ vec![vec![partitioned_file]]
}
-*/
/// # Safety
/// This function is inherently unsafe since it deals with raw pointers passed
from JNI.
@@ -645,65 +653,42 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBat
.get_string(&JString::from_raw(file_path))
.unwrap()
.into();
- let batch_stream: Option<SendableRecordBatchStream>;
- // TODO: (ARROW NATIVE) Use the common global runtime
+
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let session_ctx = SessionContext::new();
+
let (object_store_url, object_store_path) =
prepare_object_store(session_ctx.runtime_env(), path.clone())?;
- // EXPERIMENTAL - BEGIN
- // TODO: (ARROW NATIVE) - Remove code duplication between this and POC
1
- // copy the input on-heap buffer to native
let required_schema_array = JByteArray::from_raw(required_schema);
let required_schema_buffer =
env.convert_byte_array(&required_schema_array)?;
- let required_schema_arrow =
deserialize_schema(required_schema_buffer.as_bytes())?;
- let mut partitioned_file = PartitionedFile::new_with_range(
- String::new(), // Dummy file path. We will override this with our
path so that url encoding does not occur
- file_size as u64,
- start,
- start + length,
- );
- partitioned_file.object_meta.location = object_store_path;
+ let required_schema =
Arc::new(deserialize_schema(required_schema_buffer.as_bytes())?);
+
+ let file_groups =
+ get_file_groups_single_file(&object_store_path, file_size as u64,
start, length);
+
let session_timezone: String = env
.get_string(&JString::from_raw(session_timezone))
.unwrap()
.into();
- let mut spark_parquet_options =
- SparkParquetOptions::new(EvalMode::Legacy,
session_timezone.as_str(), false);
- spark_parquet_options.allow_cast_unsigned_ints = true;
-
- let mut table_parquet_options = TableParquetOptions::new();
- // TODO: Maybe these are configs?
- table_parquet_options.global.pushdown_filters = true;
- table_parquet_options.global.reorder_filters = true;
-
- let parquet_source =
ParquetSource::new(table_parquet_options).with_schema_adapter_factory(
- Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options)),
- );
-
- // We build the file scan config with the *required* schema so that
the reader knows
- // the output schema we want
- let file_scan_config = FileScanConfig::new(object_store_url,
Arc::new(required_schema_arrow), Arc::new(parquet_source))
- .with_file(partitioned_file)
- // TODO: (ARROW NATIVE) - do partition columns in native
- // - will need partition schema and partition values to do so
- // .with_table_partition_cols(partition_fields)
- ;
-
- //TODO: (ARROW NATIVE) - predicate pushdown??
- // builder = builder.with_predicate(filter);
-
- let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config)));
+ let scan = init_datasource_exec(
+ required_schema,
+ None,
+ None,
+ None,
+ object_store_url,
+ file_groups,
+ None,
+ None,
+ session_timezone.as_str(),
+ )?;
let ctx = TaskContext::default();
let partition_index: usize = 0;
- batch_stream = Some(scan.execute(partition_index, Arc::new(ctx))?);
-
- // EXPERIMENTAL - END
+ let batch_stream = Some(scan.execute(partition_index, Arc::new(ctx))?);
let ctx = BatchContext {
runtime,
diff --git a/native/core/src/parquet/parquet_exec.rs
b/native/core/src/parquet/parquet_exec.rs
new file mode 100644
index 000000000..85a3d023c
--- /dev/null
+++ b/native/core/src/parquet/parquet_exec.rs
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::operators::ExecutionError;
+use crate::parquet::parquet_support::SparkParquetOptions;
+use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
+use arrow::datatypes::{Field, SchemaRef};
+use datafusion::config::TableParquetOptions;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfig, FileSource,
ParquetSource};
+use datafusion::datasource::source::DataSourceExec;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::physical_expr::expressions::BinaryExpr;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion_comet_spark_expr::EvalMode;
+use itertools::Itertools;
+use std::sync::Arc;
+
+/// Initializes a DataSourceExec plan with a ParquetSource. This may be used
by either the
+/// `native_datafusion` scan or the `native_iceberg_compat` scan.
+///
+/// `required_schema`: Schema to be projected by the scan.
+///
+/// `data_schema`: Schema of the underlying data. It is optional and, if
provided, is used
+/// instead of `required_schema` to initialize the file scan
+///
+/// `partition_schema` and `partition_fields` are optional. If
`partition_schema` is specified,
+/// then `partition_fields` must also be specified
+///
+/// `object_store_url`: Url to read data from
+///
+/// `file_groups`: A collection of groups of `PartitionedFiles` that are to
be read by the scan
+///
+/// `projection_vector`: A vector of the indexes in the schema of the fields
to be projected
+///
+/// `data_filters`: Any predicate that must be applied to the data returned
by the scan. If
+/// specified, then `data_schema` must also be specified.
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn init_datasource_exec(
+ required_schema: SchemaRef,
+ data_schema: Option<SchemaRef>,
+ partition_schema: Option<SchemaRef>,
+ partition_fields: Option<Vec<Field>>,
+ object_store_url: ObjectStoreUrl,
+ file_groups: Vec<Vec<PartitionedFile>>,
+ projection_vector: Option<Vec<usize>>,
+ data_filters: Option<Vec<Arc<dyn PhysicalExpr>>>,
+ session_timezone: &str,
+) -> Result<Arc<DataSourceExec>, ExecutionError> {
+ let (table_parquet_options, spark_parquet_options) =
get_options(session_timezone);
+ let mut parquet_source =
ParquetSource::new(table_parquet_options).with_schema_adapter_factory(
+ Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options)),
+ );
+ // Create a conjunctive form of the vector because ParquetExecBuilder takes
+ // a single expression
+ if let Some(data_filters) = data_filters {
+ let cnf_data_filters = data_filters.clone().into_iter().reduce(|left,
right| {
+ Arc::new(BinaryExpr::new(
+ left,
+ datafusion::logical_expr::Operator::And,
+ right,
+ ))
+ });
+
+ if let (Some(filter), Some(data_schema)) = (cnf_data_filters,
&data_schema) {
+ parquet_source =
parquet_source.with_predicate(Arc::clone(data_schema), filter);
+ }
+ }
+ let file_scan_config = match (data_schema, projection_vector,
partition_fields) {
+ (Some(data_schema), Some(projection_vector), Some(partition_fields))
=> get_file_config(
+ data_schema,
+ partition_schema,
+ file_groups,
+ object_store_url,
+ Arc::new(parquet_source),
+ )
+ .with_projection(Some(projection_vector))
+ .with_table_partition_cols(partition_fields),
+ _ => get_file_config(
+ required_schema,
+ partition_schema,
+ file_groups,
+ object_store_url,
+ Arc::new(parquet_source),
+ ),
+ };
+
+ Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
+}
+
+fn get_options(session_timezone: &str) -> (TableParquetOptions,
SparkParquetOptions) {
+ let mut table_parquet_options = TableParquetOptions::new();
+ // TODO: Maybe these are configs?
+ table_parquet_options.global.pushdown_filters = true;
+ table_parquet_options.global.reorder_filters = true;
+ let mut spark_parquet_options =
+ SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
+ spark_parquet_options.allow_cast_unsigned_ints = true;
+ (table_parquet_options, spark_parquet_options)
+}
+
+fn get_file_config(
+ schema: SchemaRef,
+ partition_schema: Option<SchemaRef>,
+ file_groups: Vec<Vec<PartitionedFile>>,
+ object_store_url: ObjectStoreUrl,
+ file_source: Arc<dyn FileSource>,
+) -> FileScanConfig {
+ match partition_schema {
+ Some(partition_schema) => {
+ let partition_fields: Vec<Field> = partition_schema
+ .fields()
+ .iter()
+ .map(|field| {
+ Field::new(field.name(), field.data_type().clone(),
field.is_nullable())
+ })
+ .collect_vec();
+ FileScanConfig::new(object_store_url, Arc::clone(&schema),
file_source)
+ .with_file_groups(file_groups)
+ .with_table_partition_cols(partition_fields)
+ }
+ _ => FileScanConfig::new(object_store_url, Arc::clone(&schema),
file_source)
+ .with_file_groups(file_groups),
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]