This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch dev-v6.0.0-decimal-cast
in repository https://gitbox.apache.org/repos/asf/auron.git

commit f7d0b24950028da920c3b6ca19ee74f018c753ca
Author: zhangli20 <[email protected]>
AuthorDate: Wed Dec 24 15:45:33 2025 +0800

    supports parallel reading for tables with small files
---
 native-engine/blaze-jni-bridge/src/conf.rs         |   1 +
 native-engine/datafusion-ext-plans/src/lib.rs      |   1 +
 native-engine/datafusion-ext-plans/src/orc_exec.rs |  76 ++++++------
 .../datafusion-ext-plans/src/parquet_exec.rs       | 111 +++++++++--------
 native-engine/datafusion-ext-plans/src/scan/mod.rs |   1 +
 .../datafusion-ext-plans/src/scan/parallel.rs      | 138 +++++++++++++++++++++
 .../java/org/apache/spark/sql/blaze/BlazeConf.java |   3 +
 7 files changed, 242 insertions(+), 89 deletions(-)

diff --git a/native-engine/blaze-jni-bridge/src/conf.rs 
b/native-engine/blaze-jni-bridge/src/conf.rs
index 99ba7692..c8e77a02 100644
--- a/native-engine/blaze-jni-bridge/src/conf.rs
+++ b/native-engine/blaze-jni-bridge/src/conf.rs
@@ -53,6 +53,7 @@ define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE);
 define_conf!(BooleanConf, ORC_FORCE_POSITIONAL_EVOLUTION);
 define_conf!(IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG);
 define_conf!(BooleanConf, PARSE_JSON_ERROR_FALLBACK);
+define_conf!(IntConf, NUM_PARALLEL_SCAN_FILES);
 define_conf!(StringConf, NATIVE_LOG_LEVEL);
 
 pub trait BooleanConf {
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs 
b/native-engine/datafusion-ext-plans/src/lib.rs
index 375338d2..dbde6d08 100644
--- a/native-engine/datafusion-ext-plans/src/lib.rs
+++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -19,6 +19,7 @@
 #![feature(get_mut_unchecked)]
 #![feature(portable_simd)]
 #![feature(ptr_as_ref_unchecked)]
+#![feature(unboxed_closures)]
 
 // execution plan implementations
 pub mod agg;
diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs 
b/native-engine/datafusion-ext-plans/src/orc_exec.rs
index 074ae31e..3469a49f 100644
--- a/native-engine/datafusion-ext-plans/src/orc_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs
@@ -15,11 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{any::Any, fmt, fmt::Formatter, pin::Pin, sync::Arc};
+use std::{any::Any, fmt, fmt::Formatter, sync::Arc};
 
 use arrow::{datatypes::SchemaRef, error::ArrowError};
 use blaze_jni_bridge::{
-    conf, conf::BooleanConf, jni_call_static, jni_new_global_ref, 
jni_new_string,
+    conf,
+    conf::{BooleanConf, IntConf},
+    jni_call_static, jni_new_global_ref, jni_new_string,
 };
 use bytes::Bytes;
 use datafusion::{
@@ -48,7 +50,7 @@ use orc_rust::{
 
 use crate::{
     common::execution_context::ExecutionContext,
-    scan::{BlazeSchemaMapping, internal_file_reader::InternalFileReader},
+    scan::{BlazeSchemaMapping, internal_file_reader::InternalFileReader, 
parallel::parallel_scan},
 };
 
 /// Execution plan for scanning one or more Orc partitions
@@ -154,33 +156,43 @@ impl ExecutionPlan for OrcExec {
         let resource_id = jni_new_string!(&self.fs_resource_id)?;
         let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) 
-> JObject)?;
         let fs_provider = 
Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, &io_time));
-
+        let metrics = self.metrics.clone();
         let projection = match 
self.base_config.file_column_projection_indices() {
             Some(proj) => proj,
             None => (0..self.base_config.file_schema.fields().len()).collect(),
         };
 
         let force_positional_evolution = 
conf::ORC_FORCE_POSITIONAL_EVOLUTION.value()?;
-
-        let opener = OrcOpener {
-            projection,
-            batch_size: batch_size(),
-            table_schema: self.base_config.file_schema.clone(),
-            fs_provider,
-            partition_index: partition,
-            metrics: self.metrics.clone(),
-            force_positional_evolution,
-        };
-
-        let file_stream = Box::pin(FileStream::new(
-            &self.base_config,
-            partition,
-            opener,
-            exec_ctx.execution_plan_metrics(),
-        )?);
-
-        let timed_stream = execute_orc_scan(file_stream, exec_ctx.clone())?;
-        Ok(exec_ctx.coalesce_with_default_batch_size(timed_stream))
+        let exec_ctx_cloned = exec_ctx.clone();
+        let create_file_scan_config =
+            move |file_scan_config: &FileScanConfig| -> 
Result<SendableRecordBatchStream> {
+                let opener = OrcOpener {
+                    projection: projection.clone(),
+                    batch_size: batch_size(),
+                    table_schema: file_scan_config.file_schema.clone(),
+                    fs_provider: fs_provider.clone(),
+                    partition_index: partition,
+                    metrics: metrics.clone(),
+                    force_positional_evolution,
+                };
+
+                let file_stream = FileStream::new(
+                    file_scan_config,
+                    partition,
+                    opener,
+                    exec_ctx_cloned.execution_plan_metrics(),
+                )?;
+                Ok(Box::pin(file_stream))
+            };
+
+        let num_parallel_scan_files = conf::NUM_PARALLEL_SCAN_FILES.value()? 
as usize;
+        let scan_stream = parallel_scan(
+            exec_ctx.clone(),
+            self.base_config.clone(),
+            create_file_scan_config,
+            num_parallel_scan_files,
+        )?;
+        Ok(exec_ctx.coalesce_with_default_batch_size(scan_stream))
     }
 
     fn metrics(&self) -> Option<MetricsSet> {
@@ -192,22 +204,6 @@ impl ExecutionPlan for OrcExec {
     }
 }
 
-fn execute_orc_scan(
-    mut stream: Pin<Box<FileStream<OrcOpener>>>,
-    exec_ctx: Arc<ExecutionContext>,
-) -> Result<SendableRecordBatchStream> {
-    Ok(exec_ctx
-        .clone()
-        .output_with_sender("OrcScan", move |sender| async move {
-            sender.exclude_time(exec_ctx.baseline_metrics().elapsed_compute());
-            let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer();
-            while let Some(batch) = stream.next().await.transpose()? {
-                sender.send(batch).await;
-            }
-            Ok(())
-        }))
-}
-
 struct OrcOpener {
     projection: Vec<usize>,
     batch_size: usize,
diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs 
b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
index a8ab1b35..75679d99 100644
--- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
@@ -17,12 +17,14 @@
 
 //! Execution plan for reading Parquet files
 
-use std::{any::Any, fmt, fmt::Formatter, ops::Range, pin::Pin, sync::Arc};
+use std::{any::Any, fmt, fmt::Formatter, ops::Range, sync::Arc};
 
 use arrow::datatypes::SchemaRef;
 use arrow_schema::DataType;
 use blaze_jni_bridge::{
-    conf, conf::BooleanConf, jni_call_static, jni_new_global_ref, 
jni_new_string,
+    conf,
+    conf::{BooleanConf, IntConf},
+    jni_call_static, jni_new_global_ref, jni_new_string,
 };
 use bytes::Bytes;
 use datafusion::{
@@ -48,7 +50,7 @@ use datafusion::{
 };
 use datafusion_ext_commons::{batch_size, hadoop_fs::FsProvider};
 use fmt::Debug;
-use futures::{FutureExt, StreamExt, future::BoxFuture};
+use futures::{FutureExt, future::BoxFuture};
 use itertools::Itertools;
 use object_store::ObjectMeta;
 use once_cell::sync::OnceCell;
@@ -56,7 +58,10 @@ use parking_lot::Mutex;
 
 use crate::{
     common::execution_context::ExecutionContext,
-    scan::{BlazeSchemaAdapterFactory, 
internal_file_reader::InternalFileReader},
+    scan::{
+        BlazeSchemaAdapterFactory, internal_file_reader::InternalFileReader,
+        parallel::parallel_scan,
+    },
 };
 
 /// Execution plan for scanning one or more Parquet partitions
@@ -198,46 +203,70 @@ impl ExecutionPlan for ParquetExec {
         let _timer = elapsed_compute.timer();
         let io_time = exec_ctx.register_timer_metric("io_time");
 
+        let predicate = self.predicate.clone();
+        let pruning_predicate = self.pruning_predicate.clone();
+        let page_pruning_predicate = self.page_pruning_predicate.clone();
+        let metrics = self.metrics.clone();
+
         // get fs object from jni bridge resource
         let resource_id = jni_new_string!(&self.fs_resource_id)?;
         let fs = jni_call_static!(JniBridge.getResource(resource_id.as_obj()) 
-> JObject)?;
         let fs_provider = 
Arc::new(FsProvider::new(jni_new_global_ref!(fs.as_obj())?, &io_time));
+        let parquet_file_reader_factory = 
Arc::new(FsReaderFactory::new(fs_provider.clone()));
 
         let schema_adapter_factory = Arc::new(BlazeSchemaAdapterFactory);
-        let projection = match 
self.base_config.file_column_projection_indices() {
-            Some(proj) => proj,
-            None => (0..self.base_config.file_schema.fields().len()).collect(),
+        let projection: Arc<[usize]> = match 
self.base_config.file_column_projection_indices() {
+            Some(proj) => proj.into(),
+            None => (0..self.base_config.file_schema.fields().len())
+                .collect::<Vec<_>>()
+                .into(),
         };
 
         let page_filtering_enabled = 
conf::PARQUET_ENABLE_PAGE_FILTERING.value()?;
         let bloom_filter_enabled = conf::PARQUET_ENABLE_BLOOM_FILTER.value()?;
+        let base_config = self.base_config.clone();
+
+        let exec_ctx_cloned = exec_ctx.clone();
+        let create_file_stream_fn =
+            move |file_scan_config: &FileScanConfig| -> 
Result<SendableRecordBatchStream> {
+                let opener = ParquetOpener {
+                    partition_index: partition,
+                    projection: projection.clone(),
+                    batch_size: batch_size(),
+                    limit: file_scan_config.limit,
+                    predicate: predicate.clone(),
+                    pruning_predicate: pruning_predicate.clone(),
+                    page_pruning_predicate: page_pruning_predicate.clone(),
+                    table_schema: file_scan_config.file_schema.clone(),
+                    metadata_size_hint: None,
+                    metrics: metrics.clone(),
+                    parquet_file_reader_factory: 
parquet_file_reader_factory.clone(),
+                    pushdown_filters: page_filtering_enabled,
+                    reorder_filters: page_filtering_enabled,
+                    enable_page_index: page_filtering_enabled,
+                    enable_bloom_filter: bloom_filter_enabled,
+                    schema_adapter_factory: schema_adapter_factory.clone(),
+                };
+                let mut file_stream = FileStream::new(
+                    file_scan_config,
+                    exec_ctx_cloned.partition_id(),
+                    opener,
+                    exec_ctx_cloned.execution_plan_metrics(),
+                )?;
+                if conf::IGNORE_CORRUPTED_FILES.value()? {
+                    file_stream = file_stream.with_on_error(OnError::Skip);
+                }
+                Ok(Box::pin(file_stream))
+            };
 
-        let opener = ParquetOpener {
-            partition_index: partition,
-            projection: Arc::from(projection),
-            batch_size: batch_size(),
-            limit: self.base_config.limit,
-            predicate: self.predicate.clone(),
-            pruning_predicate: self.pruning_predicate.clone(),
-            page_pruning_predicate: self.page_pruning_predicate.clone(),
-            table_schema: self.base_config.file_schema.clone(),
-            metadata_size_hint: None,
-            metrics: self.metrics.clone(),
-            parquet_file_reader_factory: 
Arc::new(FsReaderFactory::new(fs_provider)),
-            pushdown_filters: page_filtering_enabled,
-            reorder_filters: page_filtering_enabled,
-            enable_page_index: page_filtering_enabled,
-            enable_bloom_filter: bloom_filter_enabled,
-            schema_adapter_factory,
-        };
-
-        let mut file_stream = FileStream::new(&self.base_config, partition, 
opener, &self.metrics)?;
-        if conf::IGNORE_CORRUPTED_FILES.value()? {
-            file_stream = file_stream.with_on_error(OnError::Skip);
-        }
-
-        let timed_stream = execute_parquet_scan(Box::pin(file_stream), 
exec_ctx.clone())?;
-        Ok(exec_ctx.coalesce_with_default_batch_size(timed_stream))
+        let num_parallel_scan_files = conf::NUM_PARALLEL_SCAN_FILES.value()? 
as usize;
+        let scan_stream = parallel_scan(
+            exec_ctx.clone(),
+            base_config,
+            create_file_stream_fn,
+            num_parallel_scan_files,
+        )?;
+        Ok(exec_ctx.coalesce_with_default_batch_size(scan_stream))
     }
 
     fn metrics(&self) -> Option<MetricsSet> {
@@ -249,22 +278,6 @@ impl ExecutionPlan for ParquetExec {
     }
 }
 
-fn execute_parquet_scan(
-    mut stream: Pin<Box<FileStream<ParquetOpener>>>,
-    exec_ctx: Arc<ExecutionContext>,
-) -> Result<SendableRecordBatchStream> {
-    Ok(exec_ctx
-        .clone()
-        .output_with_sender("ParquetScan", move |sender| async move {
-            sender.exclude_time(exec_ctx.baseline_metrics().elapsed_compute());
-            let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer();
-            while let Some(batch) = stream.next().await.transpose()? {
-                sender.send(batch).await;
-            }
-            Ok(())
-        }))
-}
-
 #[derive(Clone)]
 pub struct FsReaderFactory {
     fs_provider: Arc<FsProvider>,
diff --git a/native-engine/datafusion-ext-plans/src/scan/mod.rs 
b/native-engine/datafusion-ext-plans/src/scan/mod.rs
index ba89500b..ad4b69cd 100644
--- a/native-engine/datafusion-ext-plans/src/scan/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/scan/mod.rs
@@ -25,6 +25,7 @@ use datafusion::{
 use datafusion_ext_commons::df_execution_err;
 
 pub mod internal_file_reader;
+pub mod parallel;
 
 #[derive(Debug)]
 pub struct BlazeSchemaAdapterFactory;
diff --git a/native-engine/datafusion-ext-plans/src/scan/parallel.rs 
b/native-engine/datafusion-ext-plans/src/scan/parallel.rs
new file mode 100644
index 00000000..fc6f38cf
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/scan/parallel.rs
@@ -0,0 +1,138 @@
+// Copyright 2022 The Blaze Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use blaze_jni_bridge::is_task_running;
+use datafusion::{
+    common::DataFusionError, datasource::physical_plan::FileScanConfig, 
error::Result,
+    execution::SendableRecordBatchStream,
+};
+use datafusion_ext_commons::df_execution_err;
+use futures_util::StreamExt;
+
+use crate::common::execution_context::ExecutionContext;
+
+pub fn parallel_scan(
+    exec_ctx: Arc<ExecutionContext>,
+    file_scan_config: FileScanConfig,
+    create_file_stream_fn: impl Fn(&FileScanConfig) -> 
Result<SendableRecordBatchStream>
+    + Send
+    + 'static,
+    num_parallel_files: usize,
+) -> Result<SendableRecordBatchStream> {
+    let partition_files = file_scan_config
+        .file_groups
+        .iter()
+        .flatten()
+        .cloned()
+        .collect::<Vec<_>>();
+
+    if num_parallel_files <= 0 {
+        return df_execution_err!("num_parallel_files must be positive, got 
{num_parallel_files}");
+    }
+
+    // no parallel
+    if num_parallel_files == 1 {
+        let mut file_stream = create_file_stream_fn(&file_scan_config)?;
+        let stream = exec_ctx
+            .clone()
+            .output_with_sender("FileScan", move |sender| async move {
+                let elapsed_compute = 
exec_ctx.baseline_metrics().elapsed_compute().clone();
+                let _timer = elapsed_compute.timer();
+                sender.exclude_time(&elapsed_compute);
+
+                while let Some(batch) = file_stream.next().await.transpose()? {
+                    sender.send(batch).await;
+                }
+                Ok(())
+            });
+        return Ok(stream);
+    }
+
+    let stream = exec_ctx
+        .clone()
+        .output_with_sender("FileScan", move |sender| async move {
+            let elapsed_compute = 
exec_ctx.baseline_metrics().elapsed_compute().clone();
+            let _timer = elapsed_compute.timer();
+            sender.exclude_time(&elapsed_compute);
+
+            let (file_streams_tx, mut file_streams_rx) =
+                tokio::sync::mpsc::channel(num_parallel_files - 1);
+            let exec_ctx_cloned = exec_ctx.clone();
+
+            // create file streams for each file
+            let mut file_streams: Vec<SendableRecordBatchStream> = 
partition_files
+                .into_iter()
+                .map(|partition_file| {
+                    let mut file_scan_config = file_scan_config.clone();
+                    let partition_id = exec_ctx_cloned.partition_id();
+                    file_scan_config.file_groups = vec![vec![]; 
file_scan_config.file_groups.len()];
+                    file_scan_config.file_groups[partition_id] = 
vec![partition_file];
+                    create_file_stream_fn(&file_scan_config)
+                })
+                .collect::<Result<_>>()?;
+
+            // read first batch without parallelism to avoid latency of later 
operators
+            if !file_streams.is_empty() {
+                if let Some(batch) = file_streams[0].next().await.transpose()? 
{
+                    sender.send(batch).await;
+                }
+            }
+
+            // read rest file streams in parallel
+            let handle = tokio::spawn(async move {
+                for mut file_stream in file_streams {
+                    let (tx, mut rx) = tokio::sync::mpsc::channel(1);
+                    let handle = tokio::spawn(async move {
+                        while is_task_running()
+                            && let Some(batch) = 
file_stream.next().await.transpose()?
+                        {
+                            tx.send(batch).await.or_else(|e| 
df_execution_err!("{e}"))?;
+                        }
+                        Ok::<_, DataFusionError>(())
+                    });
+                    let eager_stream = exec_ctx_cloned.output_with_sender(
+                        "FileScan.File",
+                        move |sender| async move {
+                            while is_task_running()
+                                && let Some(batch) = rx.recv().await
+                            {
+                                sender.send(batch).await;
+                            }
+                            handle.await.or_else(|e| 
df_execution_err!("{e}"))??;
+                            Ok(())
+                        },
+                    );
+
+                    file_streams_tx
+                        .send(eager_stream)
+                        .await
+                        .or_else(|e| df_execution_err!("{e}"))?;
+                }
+                Ok::<_, DataFusionError>(())
+            });
+
+            while let Some(mut stream) = file_streams_rx.recv().await {
+                while let Some(batch) = stream.next().await.transpose()? {
+                    sender.send(batch).await;
+                }
+            }
+            handle
+                .await
+                .or_else(|e| df_execution_err!("failed to join task handle: 
{e}"))??;
+            Ok(())
+        });
+    Ok(stream)
+}
diff --git 
a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java 
b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
index e655f79e..5136b8b5 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java
@@ -112,6 +112,9 @@ public enum BlazeConf {
 
     
ORC_FORCE_POSITIONAL_EVOLUTION("spark.blaze.orc.force.positional.evolution", 
false),
 
+    // number of parallel scan files
+    NUM_PARALLEL_SCAN_FILES("spark.blaze.numParallelScanFiles", 4),
+
     NATIVE_LOG_LEVEL("spark.blaze.native.log.level", "info");
 
     public final String key;

Reply via email to