parthchandra commented on code in PR #1992: URL: https://github.com/apache/datafusion-comet/pull/1992#discussion_r2191310903
########## native/core/src/parquet/objectstore/jni.rs: ########## @@ -0,0 +1,332 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use bytes::Bytes; +use chrono::Utc; +use futures::{stream, stream::BoxStream, StreamExt}; +use jni::{ + objects::{JClass, JObject, JValue}, + JNIEnv, JavaVM, +}; +use object_store::{ + path::Path, Attributes, Error as ObjectStoreError, GetOptions, GetRange, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, + PutOptions, PutPayload, PutResult, +}; +use once_cell::sync::OnceCell; + +static JVM: OnceCell<JavaVM> = OnceCell::new(); + +pub fn init_jvm(env: &JNIEnv) { + let _ = JVM.set(env.get_java_vm().expect("Failed to get JavaVM")); +} + +fn get_jni_env<'a>() -> jni::AttachGuard<'a> { + JVM.get() + .expect("JVM not initialized") + .attach_current_thread() + .expect("Failed to attach thread") +} + +mod jni_helpers { + use super::*; + + pub fn create_jni_hashmap<'local>( + env: &mut JNIEnv<'local>, + configs: &HashMap<String, String>, + ) -> Result<JObject<'local>, ObjectStoreError> { + let map_class = env.find_class("java/util/HashMap").map_err(jni_error)?; + let jmap = env.new_object(map_class, "()V", &[]).map_err(jni_error)?; + + for (k, v) in configs { + let jkey = env.new_string(k).map_err(jni_error)?; + let jval = env.new_string(v).map_err(jni_error)?; + + env.call_method( + &jmap, + "put", + "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", + &[JValue::Object(&jkey), JValue::Object(&jval)], + ) + .map_err(jni_error)?; + } + + Ok(jmap) + } + + pub fn jni_error(e: jni::errors::Error) -> ObjectStoreError { + ObjectStoreError::Generic { + store: "jni", + source: Box::new(e), + } + } + + pub fn get_native_class<'local>( + env: &mut JNIEnv<'local>, + ) -> Result<JClass<'local>, ObjectStoreError> { + env.find_class("org/apache/comet/parquet/Native") + .map_err(jni_error) + } +} + +/// Retrieves the length (in bytes) of a file through JNI interface. +/// +/// This method makes a JNI call to Java to get the size of the file at the specified path, +/// using the provided configuration parameters for the storage backend. +/// +/// # Arguments +/// * `path` - The filesystem path or URI of the target file +/// * `configs` - Configuration parameters for the storage backend as key-value pairs. +/// +/// # Returns +/// Returns `Ok(usize)` with the file size in bytes on success, or an `ObjectStoreError` +/// if the operation fails. +pub fn call_get_length( Review Comment: just `get_length` perhaps? ########## native/core/Cargo.toml: ########## @@ -50,6 +50,7 @@ lazy_static = "1.4.0" prost = "0.13.5" jni = "0.21" snap = "1.1" +chrono = { version = "0.4", default-features = false, features = ["clock"] } Review Comment: Maybe use the dependency define in the main Cargo.toml (i.e. make this a workspace dependency)? ########## native/core/src/parquet/objectstore/jni.rs: ########## @@ -0,0 +1,332 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use bytes::Bytes; +use chrono::Utc; +use futures::{stream, stream::BoxStream, StreamExt}; +use jni::{ + objects::{JClass, JObject, JValue}, + JNIEnv, JavaVM, +}; +use object_store::{ + path::Path, Attributes, Error as ObjectStoreError, GetOptions, GetRange, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, + PutOptions, PutPayload, PutResult, +}; +use once_cell::sync::OnceCell; + +static JVM: OnceCell<JavaVM> = OnceCell::new(); + +pub fn init_jvm(env: &JNIEnv) { + let _ = JVM.set(env.get_java_vm().expect("Failed to get JavaVM")); +} + +fn get_jni_env<'a>() -> jni::AttachGuard<'a> { + JVM.get() + .expect("JVM not initialized") + .attach_current_thread() + .expect("Failed to attach thread") +} + +mod jni_helpers { + use super::*; + + pub fn create_jni_hashmap<'local>( + env: &mut JNIEnv<'local>, + configs: &HashMap<String, String>, + ) -> Result<JObject<'local>, ObjectStoreError> { + let map_class = env.find_class("java/util/HashMap").map_err(jni_error)?; + let jmap = env.new_object(map_class, "()V", &[]).map_err(jni_error)?; + + for (k, v) in configs { + let jkey = env.new_string(k).map_err(jni_error)?; + let jval = env.new_string(v).map_err(jni_error)?; + + env.call_method( + &jmap, + "put", + "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", + &[JValue::Object(&jkey), JValue::Object(&jval)], + ) + .map_err(jni_error)?; + } + + Ok(jmap) + } + + pub fn jni_error(e: jni::errors::Error) -> ObjectStoreError { + ObjectStoreError::Generic { + store: "jni", + source: Box::new(e), + } + } + + pub fn get_native_class<'local>( + env: &mut JNIEnv<'local>, + ) -> Result<JClass<'local>, ObjectStoreError> { + env.find_class("org/apache/comet/parquet/Native") + .map_err(jni_error) + } +} + +/// Retrieves the length (in bytes) of a file through JNI interface. +/// +/// This method makes a JNI call to Java to get the size of the file at the specified path, +/// using the provided configuration parameters for the storage backend. +/// +/// # Arguments +/// * `path` - The filesystem path or URI of the target file +/// * `configs` - Configuration parameters for the storage backend as key-value pairs. +/// +/// # Returns +/// Returns `Ok(usize)` with the file size in bytes on success, or an `ObjectStoreError` +/// if the operation fails. +pub fn call_get_length( + path: &str, + configs: &HashMap<String, String>, +) -> Result<usize, ObjectStoreError> { + let mut env = get_jni_env(); + let jmap = jni_helpers::create_jni_hashmap(&mut env, configs)?; + let class = jni_helpers::get_native_class(&mut env)?; + let jpath = env.new_string(path).map_err(jni_helpers::jni_error)?; + + let result = env + .call_static_method( + class, + "getLength", + "(Ljava/lang/String;Ljava/util/Map;)J", + &[JValue::Object(&jpath), JValue::Object(&jmap)], + ) + .map_err(jni_helpers::jni_error)? + .j() + .unwrap_or(-1); + + if result < 0 { + Err(ObjectStoreError::NotFound { + path: path.to_string(), + source: Box::new(Arc::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("File not found or error reading: {path}"), + ))), + }) + } else { + Ok(result as usize) + } +} + +/// Reads a range of bytes from a file through JNI interface. +/// +/// # Arguments +/// * `raw_path` - The filesystem path or URI of the file to read +/// * `configs` - Configuration parameters for the read operation as key-value pairs +/// * `offset` - The starting byte position to read from (0-based) +/// * `len` - The number of bytes to read +/// +/// # Returns +/// Returns `Ok(Vec<u8>)` containing the requested bytes on success, or an +/// `ObjectStoreError` if the operation fails. +pub fn call_read( + path: &str, + configs: &HashMap<String, String>, + offset: usize, + len: usize, +) -> Result<Vec<u8>, ObjectStoreError> { + let mut env = get_jni_env(); + let jmap = jni_helpers::create_jni_hashmap(&mut env, configs)?; + let class = jni_helpers::get_native_class(&mut env)?; + + let jpath = env.new_string(path).map_err(jni_helpers::jni_error)?; + + let result = env + .call_static_method( + class, + "read", + "(Ljava/lang/String;Ljava/util/Map;JI)[B", + &[ + JValue::Object(&jpath), + JValue::Object(&jmap), + JValue::Long(offset as i64), + JValue::Int(len as i32), + ], + ) + .map_err(jni_helpers::jni_error)?; + + let byte_array = jni::objects::JByteArray::from(result.l().map_err(jni_helpers::jni_error)?); + + if byte_array.is_null() { + return Err(ObjectStoreError::Generic { + store: "jni", + source: "Received null byte array from Java".into(), + }); + } + + let output = env + .convert_byte_array(byte_array) + .map_err(jni_helpers::jni_error)?; + Ok(output) +} + +/// A JNI-backed implementation of [`ObjectStore`] for interacting with storage systems +/// through Java Native Interface. +#[derive(Debug, Clone)] +pub struct JniObjectStore { + base_uri: String, + configs: HashMap<String, String>, +} + +// Mark as thread-safe +unsafe impl Send for JniObjectStore {} +unsafe impl Sync for JniObjectStore {} + +impl JniObjectStore { + /// Creates a new JniObjectStore with the given base URI and configurations. + pub fn new(base_uri: String, configs: HashMap<String, String>) -> Self { + Self { + base_uri: base_uri.trim_end_matches('/').to_string(), + configs, + } + } + + /// Converts a relative path to an absolute URI using the store's base URI. + fn to_absolute_uri(&self, location: &Path) -> String { + let path_str = location.to_string(); + + // If already absolute-looking (s3a://, hdfs://, etc.), return as is + if path_str.contains("://") { + return path_str; + } + + // Handle absolute-looking paths (start with /) + let clean_path = path_str.trim_start_matches('/'); + format!("{}/{}", self.base_uri, clean_path) + } +} + +impl std::fmt::Display for JniObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "JniObjectStore") + } +} + +#[async_trait::async_trait] +impl ObjectStore for JniObjectStore { + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> Result<GetResult, ObjectStoreError> { + let path_str = self.to_absolute_uri(location); + let total_len = call_get_length(&path_str, &self.configs)? as u64; + + let range = match options.range { + Some(GetRange::Bounded(range)) => range, + Some(GetRange::Offset(offset)) => offset..total_len, + Some(GetRange::Suffix(length)) => { + let start = total_len.saturating_sub(length); + start..total_len + } + None => 0..total_len, + }; + + if range.end > total_len { + return Err(ObjectStoreError::NotFound { + path: path_str.clone(), + source: format!( + "Invalid range {}-{} for file of length {}", + range.start, range.end, total_len + ) + .into(), + }); + } + + let range_len = (range.end - range.start) as usize; + let bytes = call_read(&path_str, &self.configs, range.start as usize, range_len)?; + + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream::once(async move { + Ok(Bytes::from(bytes)) + }))), + meta: ObjectMeta { + location: location.clone(), + last_modified: Utc::now(), + size: total_len, + version: None, + e_tag: None, + }, + range, + attributes: Attributes::default(), + }) + } + + async fn put_opts( + &self, + _location: &Path, + _bytes: PutPayload, + _opts: PutOptions, + ) -> Result<PutResult, ObjectStoreError> { + todo!() + } + + async fn put_multipart_opts( + &self, + _location: &Path, + _opts: PutMultipartOpts, + ) -> Result<Box<dyn MultipartUpload + 'static>, ObjectStoreError> { + todo!() + } + + async fn delete(&self, _location: &Path) -> Result<(), ObjectStoreError> { + todo!() + } + + fn list( + &self, + _prefix: Option<&Path>, + ) -> BoxStream<'static, Result<ObjectMeta, ObjectStoreError>> { + futures::stream::empty().boxed() Review Comment: Did you intend to return an empty stream here? ########## native/core/src/parquet/parquet_support.rs: ########## @@ -384,6 +392,17 @@ pub(crate) fn prepare_object_store_with_configs( let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" { parse_hdfs_url(&url) + } else if scheme == "s3" && use_jni { Review Comment: Ah, this is a tricky bit. If `use_jni` is true we should be able to use any file system available to hadoop. We really need to scan the config for all `spark.hadoop.fs.customfs.impl` and register the jni based object store for every `customfs` ########## common/src/main/java/org/apache/comet/parquet/Native.java: ########## @@ -292,4 +299,104 @@ public static native void currentColumnBatch( * @param handle */ public static native void closeRecordBatchReader(long handle); + Review Comment: Can we move these to a new file, say `JniHDFSBridge` ? ########## native/core/src/parquet/objectstore/jni.rs: ########## @@ -0,0 +1,332 @@ +// Licensed to the Apache Software Foundation (ASF) under one Review Comment: Can we name this jni_hdfs.rs since this is a jni based implementation of an hdfs object store? ########## native/core/src/parquet/objectstore/jni.rs: ########## @@ -0,0 +1,332 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use bytes::Bytes; +use chrono::Utc; +use futures::{stream, stream::BoxStream, StreamExt}; +use jni::{ + objects::{JClass, JObject, JValue}, + JNIEnv, JavaVM, +}; +use object_store::{ + path::Path, Attributes, Error as ObjectStoreError, GetOptions, GetRange, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, + PutOptions, PutPayload, PutResult, +}; +use once_cell::sync::OnceCell; + +static JVM: OnceCell<JavaVM> = OnceCell::new(); + +pub fn init_jvm(env: &JNIEnv) { + let _ = JVM.set(env.get_java_vm().expect("Failed to get JavaVM")); +} + +fn get_jni_env<'a>() -> jni::AttachGuard<'a> { + JVM.get() + .expect("JVM not initialized") + .attach_current_thread() + .expect("Failed to attach thread") +} + +mod jni_helpers { + use super::*; + + pub fn create_jni_hashmap<'local>( + env: &mut JNIEnv<'local>, + configs: &HashMap<String, String>, + ) -> Result<JObject<'local>, ObjectStoreError> { + let map_class = env.find_class("java/util/HashMap").map_err(jni_error)?; + let jmap = env.new_object(map_class, "()V", &[]).map_err(jni_error)?; + + for (k, v) in configs { + let jkey = env.new_string(k).map_err(jni_error)?; + let jval = env.new_string(v).map_err(jni_error)?; + + env.call_method( + &jmap, + "put", + "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", + &[JValue::Object(&jkey), JValue::Object(&jval)], + ) + .map_err(jni_error)?; + } + + Ok(jmap) + } + + pub fn jni_error(e: jni::errors::Error) -> ObjectStoreError { + ObjectStoreError::Generic { + store: "jni", + source: Box::new(e), + } + } + + pub fn get_native_class<'local>( + env: &mut JNIEnv<'local>, + ) -> Result<JClass<'local>, ObjectStoreError> { + env.find_class("org/apache/comet/parquet/Native") + .map_err(jni_error) + } +} + +/// Retrieves the length (in bytes) of a file through JNI interface. +/// +/// This method makes a JNI call to Java to get the size of the file at the specified path, +/// using the provided configuration parameters for the storage backend. +/// +/// # Arguments +/// * `path` - The filesystem path or URI of the target file +/// * `configs` - Configuration parameters for the storage backend as key-value pairs. +/// +/// # Returns +/// Returns `Ok(usize)` with the file size in bytes on success, or an `ObjectStoreError` +/// if the operation fails. +pub fn call_get_length( + path: &str, + configs: &HashMap<String, String>, +) -> Result<usize, ObjectStoreError> { + let mut env = get_jni_env(); + let jmap = jni_helpers::create_jni_hashmap(&mut env, configs)?; + let class = jni_helpers::get_native_class(&mut env)?; + let jpath = env.new_string(path).map_err(jni_helpers::jni_error)?; + + let result = env + .call_static_method( + class, + "getLength", + "(Ljava/lang/String;Ljava/util/Map;)J", + &[JValue::Object(&jpath), JValue::Object(&jmap)], + ) + .map_err(jni_helpers::jni_error)? + .j() + .unwrap_or(-1); + + if result < 0 { + Err(ObjectStoreError::NotFound { + path: path.to_string(), + source: Box::new(Arc::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("File not found or error reading: {path}"), + ))), + }) + } else { + Ok(result as usize) + } +} + +/// Reads a range of bytes from a file through JNI interface. +/// +/// # Arguments +/// * `raw_path` - The filesystem path or URI of the file to read +/// * `configs` - Configuration parameters for the read operation as key-value pairs +/// * `offset` - The starting byte position to read from (0-based) +/// * `len` - The number of bytes to read +/// +/// # Returns +/// Returns `Ok(Vec<u8>)` containing the requested bytes on success, or an +/// `ObjectStoreError` if the operation fails. +pub fn call_read( Review Comment: just `read` or maybe `read_range` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org