This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new c5e78b6b5 feat: respect `batchSize/workerThreads/blockingThreads`
configurations for native_iceberg_compat scan (#1587)
c5e78b6b5 is described below
commit c5e78b6b59778f0429f0fc8157c6a959bfd9d4c3
Author: Zhen Wang <[email protected]>
AuthorDate: Thu Apr 3 03:09:02 2025 +0800
feat: respect `batchSize/workerThreads/blockingThreads` configurations for
native_iceberg_compat scan (#1587)
---
.../main/java/org/apache/comet/parquet/Native.java | 5 ++++-
.../org/apache/comet/parquet/NativeBatchReader.java | 19 ++++++++++++++++++-
native/core/src/execution/planner.rs | 5 +++++
native/core/src/parquet/mod.rs | 19 +++++++++++++------
.../apache/comet/parquet/CometParquetFileFormat.scala | 5 +++++
5 files changed, 45 insertions(+), 8 deletions(-)
diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java
b/common/src/main/java/org/apache/comet/parquet/Native.java
index 4e50190f8..277e8db23 100644
--- a/common/src/main/java/org/apache/comet/parquet/Native.java
+++ b/common/src/main/java/org/apache/comet/parquet/Native.java
@@ -256,7 +256,10 @@ public final class Native extends NativeBase {
byte[] filter,
byte[] requiredSchema,
byte[] dataSchema,
- String sessionTimezone);
+ String sessionTimezone,
+ int batchSize,
+ int workerThreads,
+ int blockingThreads);
// arrow native version of read batch
/**
diff --git
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 4f6991c5d..ea7d4bd4b 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -353,6 +353,20 @@ public class NativeBatchReader extends RecordReader<Void,
ColumnarBatch> impleme
}
}
+ int batchSize =
+ conf.getInt(
+ CometConf.COMET_BATCH_SIZE().key(),
+ (Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get());
+ int workerThreads =
+ conf.getInt(
+ CometConf.COMET_WORKER_THREADS().key(),
+ (Integer) CometConf.COMET_WORKER_THREADS().defaultValue().get());
+ ;
+ int blockingThreads =
+ conf.getInt(
+ CometConf.COMET_BLOCKING_THREADS().key(),
+ (Integer) CometConf.COMET_BLOCKING_THREADS().defaultValue().get());
+ ;
this.handle =
Native.initRecordBatchReader(
filePath,
@@ -362,7 +376,10 @@ public class NativeBatchReader extends RecordReader<Void,
ColumnarBatch> impleme
nativeFilter,
serializedRequestedArrowSchema,
serializedDataArrowSchema,
- timeZoneId);
+ timeZoneId,
+ batchSize,
+ workerThreads,
+ blockingThreads);
isInitialized = true;
}
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 826bb73cc..e8c94ec3c 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -175,6 +175,11 @@ impl PhysicalPlanner {
}
}
+ /// Return session context of this planner.
+ pub fn session_ctx(&self) -> &Arc<SessionContext> {
+ &self.session_ctx
+ }
+
/// get DataFusion PartitionedFiles from a Spark FilePartition
fn get_partitioned_files(
&self,
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 9289d9d42..d522a83aa 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -54,8 +54,9 @@ use crate::parquet::parquet_support::prepare_object_store;
use arrow::array::{Array, RecordBatch};
use arrow::buffer::{Buffer, MutableBuffer};
use datafusion::datasource::listing::PartitionedFile;
-use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{SessionConfig, SessionContext};
use futures::{poll, StreamExt};
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray,
JString, ReleaseMode};
use jni::sys::jstring;
@@ -650,9 +651,15 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBat
required_schema: jbyteArray,
data_schema: jbyteArray,
session_timezone: jstring,
+ batch_size: jint,
+ worker_threads: jint,
+ blocking_threads: jint,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| unsafe {
- let task_ctx = TaskContext::default();
+ let session_config = SessionConfig::new().with_batch_size(batch_size
as usize);
+ let planer =
+
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)));
+ let session_ctx = planer.session_ctx();
let path: String = env
.get_string(&JString::from_raw(file_path))
@@ -660,11 +667,13 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBat
.into();
let runtime = tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(worker_threads as usize)
+ .max_blocking_threads(blocking_threads as usize)
.enable_all()
.build()?;
let (object_store_url, object_store_path) =
- prepare_object_store(task_ctx.runtime_env(), path.clone())?;
+ prepare_object_store(session_ctx.runtime_env(), path.clone())?;
let required_schema_array = JByteArray::from_raw(required_schema);
let required_schema_buffer =
env.convert_byte_array(&required_schema_array)?;
@@ -674,8 +683,6 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBat
let data_schema_buffer = env.convert_byte_array(&data_schema_array)?;
let data_schema =
Arc::new(deserialize_schema(data_schema_buffer.as_bytes())?);
- let planer = PhysicalPlanner::default();
-
let data_filters = if !filter.is_null() {
let filter_array = JByteArray::from_raw(filter);
let filter_buffer = env.convert_byte_array(&filter_array)?;
@@ -708,7 +715,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_initRecordBat
)?;
let partition_index: usize = 0;
- let batch_stream = Some(scan.execute(partition_index,
Arc::new(task_ctx))?);
+ let batch_stream = Some(scan.execute(partition_index,
session_ctx.task_ctx())?);
let ctx = BatchContext {
runtime,
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index b67f99ad8..cb87f2de3 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -232,6 +232,11 @@ object CometParquetFileFormat extends Logging with
ShimSQLConf {
hadoopConf.setBoolean(
CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key,
CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.get())
+ hadoopConf.setInt(CometConf.COMET_BATCH_SIZE.key,
CometConf.COMET_BATCH_SIZE.get())
+ hadoopConf.setInt(CometConf.COMET_WORKER_THREADS.key,
CometConf.COMET_WORKER_THREADS.get())
+ hadoopConf.setInt(
+ CometConf.COMET_BLOCKING_THREADS.key,
+ CometConf.COMET_BLOCKING_THREADS.get())
}
def getDatetimeRebaseSpec(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]