This is an automated email from the ASF dual-hosted git repository.

kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e982aade3 fix: make register_object_store use same session_env as file 
scan (#1555)
e982aade3 is described below

commit e982aade3a03b844f44d0ee58b4403b0d5ef0666
Author: Zhen Wang <[email protected]>
AuthorDate: Tue Apr 1 04:48:13 2025 +0800

    fix: make register_object_store use same session_env as file scan (#1555)
    
    ## Which issue does this PR close?
    
    Part of https://github.com/apache/datafusion-comet/issues/1553
    
    ## Rationale for this change
    `register_object_store` and `file scan` should use the same `session_env` 
to ensure that `file scan` can get registered `object_store` from `session_env`
    
    ## What changes are included in this PR?
    
    make register_object_store use same session_env as file scan
    
    ## How are these changes tested?
    
    Locally passed scan benchmark on hdfs
    
    
![image](https://github.com/user-attachments/assets/a432b179-95bc-4ec8-8c1a-4f907297705f)
---
 native/core/src/parquet/mod.rs | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 17109d31b..bcf73fc54 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -54,7 +54,6 @@ use arrow::buffer::{Buffer, MutableBuffer};
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::execution::{SendableRecordBatchStream, TaskContext};
 use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
 use futures::{poll, StreamExt};
 use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, 
JString, ReleaseMode};
 use jni::sys::jstring;
@@ -649,6 +648,8 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
     session_timezone: jstring,
 ) -> jlong {
     try_unwrap_or_throw(&e, |mut env| unsafe {
+        let task_ctx = TaskContext::default();
+
         let path: String = env
             .get_string(&JString::from_raw(file_path))
             .unwrap()
@@ -657,10 +658,9 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
         let runtime = tokio::runtime::Builder::new_multi_thread()
             .enable_all()
             .build()?;
-        let session_ctx = SessionContext::new();
 
         let (object_store_url, object_store_path) =
-            prepare_object_store(session_ctx.runtime_env(), path.clone())?;
+            prepare_object_store(task_ctx.runtime_env(), path.clone())?;
 
         let required_schema_array = JByteArray::from_raw(required_schema);
         let required_schema_buffer = 
env.convert_byte_array(&required_schema_array)?;
@@ -686,9 +686,8 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
             session_timezone.as_str(),
         )?;
 
-        let ctx = TaskContext::default();
         let partition_index: usize = 0;
-        let batch_stream = Some(scan.execute(partition_index, Arc::new(ctx))?);
+        let batch_stream = Some(scan.execute(partition_index, 
Arc::new(task_ctx))?);
 
         let ctx = BatchContext {
             runtime,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to