This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-v6.0.0-decimal-cast in repository https://gitbox.apache.org/repos/asf/auron.git
commit f7d0b24950028da920c3b6ca19ee74f018c753ca Author: zhangli20 <[email protected]> AuthorDate: Wed Dec 24 15:45:33 2025 +0800 supports parallel reading for tables with small files --- native-engine/blaze-jni-bridge/src/conf.rs | 1 + native-engine/datafusion-ext-plans/src/lib.rs | 1 + native-engine/datafusion-ext-plans/src/orc_exec.rs | 76 ++++++------ .../datafusion-ext-plans/src/parquet_exec.rs | 111 +++++++++-------- native-engine/datafusion-ext-plans/src/scan/mod.rs | 1 + .../datafusion-ext-plans/src/scan/parallel.rs | 138 +++++++++++++++++++++ .../java/org/apache/spark/sql/blaze/BlazeConf.java | 3 + 7 files changed, 242 insertions(+), 89 deletions(-) diff --git a/native-engine/blaze-jni-bridge/src/conf.rs b/native-engine/blaze-jni-bridge/src/conf.rs index 99ba7692..c8e77a02 100644 --- a/native-engine/blaze-jni-bridge/src/conf.rs +++ b/native-engine/blaze-jni-bridge/src/conf.rs @@ -53,6 +53,7 @@ define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE); define_conf!(BooleanConf, ORC_FORCE_POSITIONAL_EVOLUTION); define_conf!(IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG); define_conf!(BooleanConf, PARSE_JSON_ERROR_FALLBACK); +define_conf!(IntConf, NUM_PARALLEL_SCAN_FILES); define_conf!(StringConf, NATIVE_LOG_LEVEL); pub trait BooleanConf { diff --git a/native-engine/datafusion-ext-plans/src/lib.rs b/native-engine/datafusion-ext-plans/src/lib.rs index 375338d2..dbde6d08 100644 --- a/native-engine/datafusion-ext-plans/src/lib.rs +++ b/native-engine/datafusion-ext-plans/src/lib.rs @@ -19,6 +19,7 @@ #![feature(get_mut_unchecked)] #![feature(portable_simd)] #![feature(ptr_as_ref_unchecked)] +#![feature(unboxed_closures)] // execution plan implementations pub mod agg; diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index 074ae31e..3469a49f 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, fmt, fmt::Formatter, pin::Pin, sync::Arc}; +use std::{any::Any, fmt, fmt::Formatter, sync::Arc}; use arrow::{datatypes::SchemaRef, error::ArrowError}; use blaze_jni_bridge::{ - conf, conf::BooleanConf, jni_call_static, jni_new_global_ref, jni_new_string, + conf, + conf::{BooleanConf, IntConf}, + jni_call_static, jni_new_global_ref, jni_new_string, }; use bytes::Bytes; use datafusion::{ @@ -48,7 +50,7 @@ use orc_rust::{ use crate::{ common::execution_context::ExecutionContext, - scan::{BlazeSchemaMapping, internal_file_reader::InternalFileReader}, + scan::{BlazeSchemaMapping, internal_file_reader::InternalFileReader, parallel::parallel_scan}, }; /// Execution plan for scanning one or more Orc partitions @@ -154,33 +156,43 @@ impl ExecutionPlan for OrcExec { let resource_id = jni_new_string!(&self.fs_resource_id)?; let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?; let fs_provider = Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, &io_time)); - + let metrics = self.metrics.clone(); let projection = match self.base_config.file_column_projection_indices() { Some(proj) => proj, None => (0..self.base_config.file_schema.fields().len()).collect(), }; let force_positional_evolution = conf::ORC_FORCE_POSITIONAL_EVOLUTION.value()?; - - let opener = OrcOpener { - projection, - batch_size: batch_size(), - table_schema: self.base_config.file_schema.clone(), - fs_provider, - partition_index: partition, - metrics: self.metrics.clone(), - force_positional_evolution, - }; - - let file_stream = Box::pin(FileStream::new( - &self.base_config, - partition, - opener, - exec_ctx.execution_plan_metrics(), - )?); - - let timed_stream = execute_orc_scan(file_stream, exec_ctx.clone())?; - Ok(exec_ctx.coalesce_with_default_batch_size(timed_stream)) + let exec_ctx_cloned = exec_ctx.clone(); + let create_file_scan_config = + move |file_scan_config: &FileScanConfig| -> Result<SendableRecordBatchStream> { + let opener = OrcOpener { + projection: projection.clone(), + batch_size: batch_size(), + table_schema: file_scan_config.file_schema.clone(), + fs_provider: fs_provider.clone(), + partition_index: partition, + metrics: metrics.clone(), + force_positional_evolution, + }; + + let file_stream = FileStream::new( + file_scan_config, + partition, + opener, + exec_ctx_cloned.execution_plan_metrics(), + )?; + Ok(Box::pin(file_stream)) + }; + + let num_parallel_scan_files = conf::NUM_PARALLEL_SCAN_FILES.value()? as usize; + let scan_stream = parallel_scan( + exec_ctx.clone(), + self.base_config.clone(), + create_file_scan_config, + num_parallel_scan_files, + )?; + Ok(exec_ctx.coalesce_with_default_batch_size(scan_stream)) } fn metrics(&self) -> Option<MetricsSet> { @@ -192,22 +204,6 @@ impl ExecutionPlan for OrcExec { } } -fn execute_orc_scan( - mut stream: Pin<Box<FileStream<OrcOpener>>>, - exec_ctx: Arc<ExecutionContext>, -) -> Result<SendableRecordBatchStream> { - Ok(exec_ctx - .clone() - .output_with_sender("OrcScan", move |sender| async move { - sender.exclude_time(exec_ctx.baseline_metrics().elapsed_compute()); - let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer(); - while let Some(batch) = stream.next().await.transpose()? { - sender.send(batch).await; - } - Ok(()) - })) -} - struct OrcOpener { projection: Vec<usize>, batch_size: usize, diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_exec.rs index a8ab1b35..75679d99 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs @@ -17,12 +17,14 @@ //! Execution plan for reading Parquet files -use std::{any::Any, fmt, fmt::Formatter, ops::Range, pin::Pin, sync::Arc}; +use std::{any::Any, fmt, fmt::Formatter, ops::Range, sync::Arc}; use arrow::datatypes::SchemaRef; use arrow_schema::DataType; use blaze_jni_bridge::{ - conf, conf::BooleanConf, jni_call_static, jni_new_global_ref, jni_new_string, + conf, + conf::{BooleanConf, IntConf}, + jni_call_static, jni_new_global_ref, jni_new_string, }; use bytes::Bytes; use datafusion::{ @@ -48,7 +50,7 @@ use datafusion::{ }; use datafusion_ext_commons::{batch_size, hadoop_fs::FsProvider}; use fmt::Debug; -use futures::{FutureExt, StreamExt, future::BoxFuture}; +use futures::{FutureExt, future::BoxFuture}; use itertools::Itertools; use object_store::ObjectMeta; use once_cell::sync::OnceCell; @@ -56,7 +58,10 @@ use parking_lot::Mutex; use crate::{ common::execution_context::ExecutionContext, - scan::{BlazeSchemaAdapterFactory, internal_file_reader::InternalFileReader}, + scan::{ + BlazeSchemaAdapterFactory, internal_file_reader::InternalFileReader, + parallel::parallel_scan, + }, }; /// Execution plan for scanning one or more Parquet partitions @@ -198,46 +203,70 @@ impl ExecutionPlan for ParquetExec { let _timer = elapsed_compute.timer(); let io_time = exec_ctx.register_timer_metric("io_time"); + let predicate = self.predicate.clone(); + let pruning_predicate = self.pruning_predicate.clone(); + let page_pruning_predicate = self.page_pruning_predicate.clone(); + let metrics = self.metrics.clone(); + // get fs object from jni bridge resource let resource_id = jni_new_string!(&self.fs_resource_id)?; let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?; let fs_provider = Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, &io_time)); + let parquet_file_reader_factory = Arc::new(FsReaderFactory::new(fs_provider.clone())); let schema_adapter_factory = Arc::new(BlazeSchemaAdapterFactory); - let projection = match self.base_config.file_column_projection_indices() { - Some(proj) => proj, - None => (0..self.base_config.file_schema.fields().len()).collect(), + let projection: Arc<[usize]> = match self.base_config.file_column_projection_indices() { + Some(proj) => proj.into(), + None => (0..self.base_config.file_schema.fields().len()) + .collect::<Vec<_>>() + .into(), }; let page_filtering_enabled = conf::PARQUET_ENABLE_PAGE_FILTERING.value()?; let bloom_filter_enabled = conf::PARQUET_ENABLE_BLOOM_FILTER.value()?; + let base_config = self.base_config.clone(); + + let exec_ctx_cloned = exec_ctx.clone(); + let create_file_stream_fn = + move |file_scan_config: &FileScanConfig| -> Result<SendableRecordBatchStream> { + let opener = ParquetOpener { + partition_index: partition, + projection: projection.clone(), + batch_size: batch_size(), + limit: file_scan_config.limit, + predicate: predicate.clone(), + pruning_predicate: pruning_predicate.clone(), + page_pruning_predicate: page_pruning_predicate.clone(), + table_schema: file_scan_config.file_schema.clone(), + metadata_size_hint: None, + metrics: metrics.clone(), + parquet_file_reader_factory: parquet_file_reader_factory.clone(), + pushdown_filters: page_filtering_enabled, + reorder_filters: page_filtering_enabled, + enable_page_index: page_filtering_enabled, + enable_bloom_filter: bloom_filter_enabled, + schema_adapter_factory: schema_adapter_factory.clone(), + }; + let mut file_stream = FileStream::new( + file_scan_config, + exec_ctx_cloned.partition_id(), + opener, + exec_ctx_cloned.execution_plan_metrics(), + )?; + if conf::IGNORE_CORRUPTED_FILES.value()? { + file_stream = file_stream.with_on_error(OnError::Skip); + } + Ok(Box::pin(file_stream)) + }; - let opener = ParquetOpener { - partition_index: partition, - projection: Arc::from(projection), - batch_size: batch_size(), - limit: self.base_config.limit, - predicate: self.predicate.clone(), - pruning_predicate: self.pruning_predicate.clone(), - page_pruning_predicate: self.page_pruning_predicate.clone(), - table_schema: self.base_config.file_schema.clone(), - metadata_size_hint: None, - metrics: self.metrics.clone(), - parquet_file_reader_factory: Arc::new(FsReaderFactory::new(fs_provider)), - pushdown_filters: page_filtering_enabled, - reorder_filters: page_filtering_enabled, - enable_page_index: page_filtering_enabled, - enable_bloom_filter: bloom_filter_enabled, - schema_adapter_factory, - }; - - let mut file_stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?; - if conf::IGNORE_CORRUPTED_FILES.value()? { - file_stream = file_stream.with_on_error(OnError::Skip); - } - - let timed_stream = execute_parquet_scan(Box::pin(file_stream), exec_ctx.clone())?; - Ok(exec_ctx.coalesce_with_default_batch_size(timed_stream)) + let num_parallel_scan_files = conf::NUM_PARALLEL_SCAN_FILES.value()? as usize; + let scan_stream = parallel_scan( + exec_ctx.clone(), + base_config, + create_file_stream_fn, + num_parallel_scan_files, + )?; + Ok(exec_ctx.coalesce_with_default_batch_size(scan_stream)) } fn metrics(&self) -> Option<MetricsSet> { @@ -249,22 +278,6 @@ impl ExecutionPlan for ParquetExec { } } -fn execute_parquet_scan( - mut stream: Pin<Box<FileStream<ParquetOpener>>>, - exec_ctx: Arc<ExecutionContext>, -) -> Result<SendableRecordBatchStream> { - Ok(exec_ctx - .clone() - .output_with_sender("ParquetScan", move |sender| async move { - sender.exclude_time(exec_ctx.baseline_metrics().elapsed_compute()); - let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer(); - while let Some(batch) = stream.next().await.transpose()? { - sender.send(batch).await; - } - Ok(()) - })) -} - #[derive(Clone)] pub struct FsReaderFactory { fs_provider: Arc<FsProvider>, diff --git a/native-engine/datafusion-ext-plans/src/scan/mod.rs b/native-engine/datafusion-ext-plans/src/scan/mod.rs index ba89500b..ad4b69cd 100644 --- a/native-engine/datafusion-ext-plans/src/scan/mod.rs +++ b/native-engine/datafusion-ext-plans/src/scan/mod.rs @@ -25,6 +25,7 @@ use datafusion::{ use datafusion_ext_commons::df_execution_err; pub mod internal_file_reader; +pub mod parallel; #[derive(Debug)] pub struct BlazeSchemaAdapterFactory; diff --git a/native-engine/datafusion-ext-plans/src/scan/parallel.rs b/native-engine/datafusion-ext-plans/src/scan/parallel.rs new file mode 100644 index 00000000..fc6f38cf --- /dev/null +++ b/native-engine/datafusion-ext-plans/src/scan/parallel.rs @@ -0,0 +1,138 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use blaze_jni_bridge::is_task_running; +use datafusion::{ + common::DataFusionError, datasource::physical_plan::FileScanConfig, error::Result, + execution::SendableRecordBatchStream, +}; +use datafusion_ext_commons::df_execution_err; +use futures_util::StreamExt; + +use crate::common::execution_context::ExecutionContext; + +pub fn parallel_scan( + exec_ctx: Arc<ExecutionContext>, + file_scan_config: FileScanConfig, + create_file_stream_fn: impl Fn(&FileScanConfig) -> Result<SendableRecordBatchStream> + + Send + + 'static, + num_parallel_files: usize, +) -> Result<SendableRecordBatchStream> { + let partition_files = file_scan_config + .file_groups + .iter() + .flatten() + .cloned() + .collect::<Vec<_>>(); + + if num_parallel_files <= 0 { + return df_execution_err!("num_parallel_files must be positive, got {num_parallel_files}"); + } + + // no parallel + if num_parallel_files == 1 { + let mut file_stream = create_file_stream_fn(&file_scan_config)?; + let stream = exec_ctx + .clone() + .output_with_sender("FileScan", move |sender| async move { + let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone(); + let _timer = elapsed_compute.timer(); + sender.exclude_time(&elapsed_compute); + + while let Some(batch) = file_stream.next().await.transpose()? { + sender.send(batch).await; + } + Ok(()) + }); + return Ok(stream); + } + + let stream = exec_ctx + .clone() + .output_with_sender("FileScan", move |sender| async move { + let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone(); + let _timer = elapsed_compute.timer(); + sender.exclude_time(&elapsed_compute); + + let (file_streams_tx, mut file_streams_rx) = + tokio::sync::mpsc::channel(num_parallel_files - 1); + let exec_ctx_cloned = exec_ctx.clone(); + + // create file streams for each file + let mut file_streams: Vec<SendableRecordBatchStream> = partition_files + .into_iter() + .map(|partition_file| { + let mut file_scan_config = file_scan_config.clone(); + let partition_id = exec_ctx_cloned.partition_id(); + file_scan_config.file_groups = vec![vec![]; file_scan_config.file_groups.len()]; + file_scan_config.file_groups[partition_id] = vec![partition_file]; + create_file_stream_fn(&file_scan_config) + }) + .collect::<Result<_>>()?; + + // read first batch without parallelism to avoid latency of later operators + if !file_streams.is_empty() { + if let Some(batch) = file_streams[0].next().await.transpose()? { + sender.send(batch).await; + } + } + + // read rest file streams in parallel + let handle = tokio::spawn(async move { + for mut file_stream in file_streams { + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let handle = tokio::spawn(async move { + while is_task_running() + && let Some(batch) = file_stream.next().await.transpose()? + { + tx.send(batch).await.or_else(|e| df_execution_err!("{e}"))?; + } + Ok::<_, DataFusionError>(()) + }); + let eager_stream = exec_ctx_cloned.output_with_sender( + "FileScan.File", + move |sender| async move { + while is_task_running() + && let Some(batch) = rx.recv().await + { + sender.send(batch).await; + } + handle.await.or_else(|e| df_execution_err!("{e}"))??; + Ok(()) + }, + ); + + file_streams_tx + .send(eager_stream) + .await + .or_else(|e| df_execution_err!("{e}"))?; + } + Ok::<_, DataFusionError>(()) + }); + + while let Some(mut stream) = file_streams_rx.recv().await { + while let Some(batch) = stream.next().await.transpose()? { + sender.send(batch).await; + } + } + handle + .await + .or_else(|e| df_execution_err!("failed to join task handle: {e}"))??; + Ok(()) + }); + Ok(stream) +} diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java index e655f79e..5136b8b5 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java @@ -112,6 +112,9 @@ public enum BlazeConf { ORC_FORCE_POSITIONAL_EVOLUTION("spark.blaze.orc.force.positional.evolution", false), + // number of parallel scan files + NUM_PARALLEL_SCAN_FILES("spark.blaze.numParallelScanFiles", 4), + NATIVE_LOG_LEVEL("spark.blaze.native.log.level", "info"); public final String key;
