This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new aefb3b684 feat: Add support for remote Parquet HDFS writer with 
openDAL (#2929)
aefb3b684 is described below

commit aefb3b68400bd9e483e2a9e15e890f4032aff77c
Author: Oleks V <[email protected]>
AuthorDate: Mon Dec 29 12:36:27 2025 -0800

    feat: Add support for remote Parquet HDFS writer with openDAL (#2929)
---
 .github/actions/rust-test/action.yaml              |   2 +
 native/Cargo.lock                                  |  68 +--
 native/core/Cargo.toml                             |   2 +-
 .../core/src/execution/operators/parquet_writer.rs | 560 +++++++++++++++++++--
 native/core/src/lib.rs                             |  22 +-
 native/core/src/parquet/parquet_support.rs         |  63 ++-
 native/fs-hdfs/Cargo.toml                          |   2 +-
 native/hdfs/Cargo.toml                             |   4 +-
 native/hdfs/src/object_store/hdfs.rs               |  10 +-
 .../serde/operator/CometDataWritingCommand.scala   |   5 +-
 10 files changed, 643 insertions(+), 95 deletions(-)

diff --git a/.github/actions/rust-test/action.yaml 
b/.github/actions/rust-test/action.yaml
index 4c9b13a17..10fc1375f 100644
--- a/.github/actions/rust-test/action.yaml
+++ b/.github/actions/rust-test/action.yaml
@@ -70,5 +70,7 @@ runs:
       shell: bash
       run: |
         cd native
+        # Set LD_LIBRARY_PATH to include JVM library path for tests that use 
JNI
+        export LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH}
         RUST_BACKTRACE=1 cargo nextest run
 
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 4d97ee350..dd908ce46 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -598,9 +598,9 @@ dependencies = [
 
 [[package]]
 name = "aws-lc-rs"
-version = "1.15.1"
+version = "1.15.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6b5ce75405893cd713f9ab8e297d8e438f624dde7d706108285f7e17a25a180f"
+checksum = "6a88aab2464f1f25453baa7a07c84c5b7684e274054ba06817f382357f77a288"
 dependencies = [
  "aws-lc-sys",
  "zeroize",
@@ -608,9 +608,9 @@ dependencies = [
 
 [[package]]
 name = "aws-lc-sys"
-version = "0.34.0"
+version = "0.35.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "179c3777a8b5e70e90ea426114ffc565b2c1a9f82f6c4a0c5a34aa6ef5e781b6"
+checksum = "b45afffdee1e7c9126814751f88dddc747f41d91da16c9551a0f1e8a11e788a1"
 dependencies = [
  "cc",
  "cmake",
@@ -789,9 +789,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-json"
-version = "0.61.8"
+version = "0.61.9"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a6864c190cbb8e30cf4b77b2c8f3b6dfffa697a09b7218d2f7cd3d4c4065a9f7"
+checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551"
 dependencies = [
  "aws-smithy-types",
 ]
@@ -817,9 +817,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-runtime"
-version = "1.9.5"
+version = "1.9.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5"
+checksum = "65fda37911905ea4d3141a01364bc5509a0f32ae3f3b22d6e330c0abfb62d247"
 dependencies = [
  "aws-smithy-async",
  "aws-smithy-http",
@@ -1145,9 +1145,9 @@ dependencies = [
 
 [[package]]
 name = "bumpalo"
-version = "3.19.0"
+version = "3.19.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
+checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510"
 
 [[package]]
 name = "bytecheck"
@@ -1361,9 +1361,9 @@ checksum = 
"a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
 
 [[package]]
 name = "cmake"
-version = "0.1.54"
+version = "0.1.57"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0"
+checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d"
 dependencies = [
  "cc",
 ]
@@ -1852,7 +1852,7 @@ dependencies = [
  "async-trait",
  "bytes",
  "chrono",
- "fs-hdfs",
+ "datafusion-comet-fs-hdfs3",
  "fs-hdfs3",
  "futures",
  "object_store",
@@ -2728,20 +2728,6 @@ dependencies = [
  "percent-encoding",
 ]
 
-[[package]]
-name = "fs-hdfs"
-version = "0.1.12"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "25f164ff6334da016dffd1c29a3c05b81c35b857ef829d3fa9e58ae8d3e6f76b"
-dependencies = [
- "bindgen 0.64.0",
- "cc",
- "lazy_static",
- "libc",
- "log",
- "url",
-]
-
 [[package]]
 name = "fs-hdfs3"
 version = "0.1.12"
@@ -5005,9 +4991,9 @@ dependencies = [
 
 [[package]]
 name = "roaring"
-version = "0.11.2"
+version = "0.11.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f08d6a905edb32d74a5d5737a0c9d7e950c312f3c46cb0ca0a2ca09ea11878a0"
+checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885"
 dependencies = [
  "bytemuck",
  "byteorder",
@@ -5159,9 +5145,9 @@ dependencies = [
 
 [[package]]
 name = "rustls-pki-types"
-version = "1.13.1"
+version = "1.13.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c"
+checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282"
 dependencies = [
  "web-time",
  "zeroize",
@@ -5878,18 +5864,18 @@ dependencies = [
 
 [[package]]
 name = "toml_datetime"
-version = "0.7.3"
+version = "0.7.5+spec-1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533"
+checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347"
 dependencies = [
  "serde_core",
 ]
 
 [[package]]
 name = "toml_edit"
-version = "0.23.9"
+version = "0.23.10+spec-1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832"
+checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269"
 dependencies = [
  "indexmap 2.12.1",
  "toml_datetime",
@@ -5899,9 +5885,9 @@ dependencies = [
 
 [[package]]
 name = "toml_parser"
-version = "1.0.4"
+version = "1.0.6+spec-1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e"
+checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44"
 dependencies = [
  "winnow",
 ]
@@ -5953,9 +5939,9 @@ checksum = 
"8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
 
 [[package]]
 name = "tracing"
-version = "0.1.43"
+version = "0.1.44"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647"
+checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
 dependencies = [
  "pin-project-lite",
  "tracing-attributes",
@@ -5975,9 +5961,9 @@ dependencies = [
 
 [[package]]
 name = "tracing-core"
-version = "0.1.35"
+version = "0.1.36"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c"
+checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
 dependencies = [
  "once_cell",
 ]
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 0663a118d..7b32be36a 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -99,7 +99,7 @@ datafusion-functions-nested = { version = "51.0.0" }
 
 [features]
 backtrace = ["datafusion/backtrace"]
-default = []
+default = ["hdfs-opendal"]
 hdfs = ["datafusion-comet-objectstore-hdfs"]
 hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"]
 jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
diff --git a/native/core/src/execution/operators/parquet_writer.rs 
b/native/core/src/execution/operators/parquet_writer.rs
index 57246abf7..2ca1e9cfd 100644
--- a/native/core/src/execution/operators/parquet_writer.rs
+++ b/native/core/src/execution/operators/parquet_writer.rs
@@ -22,9 +22,13 @@ use std::{
     fmt,
     fmt::{Debug, Formatter},
     fs::File,
+    io::Cursor,
     sync::Arc,
 };
 
+use opendal::{services::Hdfs, Operator};
+use url::Url;
+
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
@@ -49,6 +53,134 @@ use parquet::{
 
 use crate::execution::shuffle::CompressionCodec;
 
+/// Enum representing different types of Arrow writers based on storage backend
+enum ParquetWriter {
+    /// Writer for local file system
+    LocalFile(ArrowWriter<File>),
+    /// Writer for HDFS or other remote storage (writes to in-memory buffer)
+    /// Contains the arrow writer, HDFS operator, and destination path
+    /// an Arrow writer writes to in-memory buffer the data converted to 
Parquet format
+    /// The opendal::Writer is created lazily on first write
+    Remote(
+        ArrowWriter<Cursor<Vec<u8>>>,
+        Option<opendal::Writer>,
+        Operator,
+        String,
+    ),
+}
+
+impl ParquetWriter {
+    /// Write a RecordBatch to the underlying writer
+    async fn write(
+        &mut self,
+        batch: &RecordBatch,
+    ) -> std::result::Result<(), parquet::errors::ParquetError> {
+        match self {
+            ParquetWriter::LocalFile(writer) => writer.write(batch),
+            ParquetWriter::Remote(
+                arrow_parquet_buffer_writer,
+                hdfs_writer_opt,
+                op,
+                output_path,
+            ) => {
+                // Write batch to in-memory buffer
+                arrow_parquet_buffer_writer.write(batch)?;
+
+                // Flush and get the current buffer content
+                arrow_parquet_buffer_writer.flush()?;
+                let cursor = arrow_parquet_buffer_writer.inner_mut();
+                let current_data = cursor.get_ref().clone();
+
+                // Create HDFS writer lazily on first write
+                if hdfs_writer_opt.is_none() {
+                    let writer = 
op.writer(output_path.as_str()).await.map_err(|e| {
+                        parquet::errors::ParquetError::External(
+                            format!("Failed to create HDFS writer for '{}': 
{}", output_path, e)
+                                .into(),
+                        )
+                    })?;
+                    *hdfs_writer_opt = Some(writer);
+                }
+
+                // Write the accumulated data to HDFS
+                if let Some(hdfs_writer) = hdfs_writer_opt {
+                    hdfs_writer.write(current_data).await.map_err(|e| {
+                        parquet::errors::ParquetError::External(
+                            format!(
+                                "Failed to write batch to HDFS file '{}': {}",
+                                output_path, e
+                            )
+                            .into(),
+                        )
+                    })?;
+                }
+
+                // Clear the buffer after upload
+                cursor.get_mut().clear();
+                cursor.set_position(0);
+
+                Ok(())
+            }
+        }
+    }
+
+    /// Close the writer and finalize the file
+    async fn close(self) -> std::result::Result<(), 
parquet::errors::ParquetError> {
+        match self {
+            ParquetWriter::LocalFile(writer) => {
+                writer.close()?;
+                Ok(())
+            }
+            ParquetWriter::Remote(
+                arrow_parquet_buffer_writer,
+                mut hdfs_writer_opt,
+                op,
+                output_path,
+            ) => {
+                // Close the arrow writer to finalize parquet format
+                let cursor = arrow_parquet_buffer_writer.into_inner()?;
+                let final_data = cursor.into_inner();
+
+                // Create HDFS writer if not already created
+                if hdfs_writer_opt.is_none() && !final_data.is_empty() {
+                    let writer = 
op.writer(output_path.as_str()).await.map_err(|e| {
+                        parquet::errors::ParquetError::External(
+                            format!("Failed to create HDFS writer for '{}': 
{}", output_path, e)
+                                .into(),
+                        )
+                    })?;
+                    hdfs_writer_opt = Some(writer);
+                }
+
+                // Write any remaining data
+                if !final_data.is_empty() {
+                    if let Some(mut hdfs_writer) = hdfs_writer_opt {
+                        hdfs_writer.write(final_data).await.map_err(|e| {
+                            parquet::errors::ParquetError::External(
+                                format!(
+                                    "Failed to write final data to HDFS file 
'{}': {}",
+                                    output_path, e
+                                )
+                                .into(),
+                            )
+                        })?;
+
+                        // Close the HDFS writer
+                        hdfs_writer.close().await.map_err(|e| {
+                            parquet::errors::ParquetError::External(
+                                format!("Failed to close HDFS writer for '{}': 
{}", output_path, e)
+                                    .into(),
+                            )
+                        })?;
+                    }
+                }
+
+                Ok(())
+            }
+        }
+    }
+}
+
 /// Parquet writer operator that writes input batches to a Parquet file
 #[derive(Debug)]
 pub struct ParquetWriterExec {
@@ -119,6 +251,129 @@ impl ParquetWriterExec {
             CompressionCodec::Snappy => Ok(Compression::SNAPPY),
         }
     }
+
+    /// Create an Arrow writer based on the storage scheme
+    ///
+    /// # Arguments
+    /// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local")
+    /// * `output_file_path` - The full path to the output file
+    /// * `schema` - The Arrow schema for the Parquet file
+    /// * `props` - Writer properties including compression
+    ///
+    /// # Returns
+    /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme
+    /// * `Err(DataFusionError)` - If writer creation fails
+    fn create_arrow_writer(
+        output_file_path: &str,
+        schema: SchemaRef,
+        props: WriterProperties,
+    ) -> Result<ParquetWriter> {
+        // Determine storage scheme from output_file_path
+        let storage_scheme = if output_file_path.starts_with("hdfs://") {
+            "hdfs"
+        } else if output_file_path.starts_with("s3://") || 
output_file_path.starts_with("s3a://") {
+            "s3"
+        } else {
+            "local"
+        };
+
+        match storage_scheme {
+            "hdfs" => {
+                // Parse the output_file_path to extract namenode and path
+                // Expected format: hdfs://namenode:port/path/to/file
+                let url = Url::parse(output_file_path).map_err(|e| {
+                    DataFusionError::Execution(format!(
+                        "Failed to parse HDFS URL '{}': {}",
+                        output_file_path, e
+                    ))
+                })?;
+
+                // Extract namenode (scheme + host + port)
+                let namenode = format!(
+                    "{}://{}{}",
+                    url.scheme(),
+                    url.host_str().unwrap_or("localhost"),
+                    url.port()
+                        .map(|p| format!(":{}", p))
+                        .unwrap_or_else(|| ":9000".to_string())
+                );
+
+                // Extract the path (without the scheme and host)
+                let hdfs_path = url.path().to_string();
+
+                // For remote storage (HDFS, S3), write to an in-memory buffer
+                let buffer = Vec::new();
+                let cursor = Cursor::new(buffer);
+                let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, 
schema, Some(props))
+                    .map_err(|e| {
+                        DataFusionError::Execution(format!(
+                            "Failed to create {} writer: {}",
+                            storage_scheme, e
+                        ))
+                    })?;
+
+                let builder = Hdfs::default().name_node(&namenode);
+                let op = Operator::new(builder)
+                    .map_err(|e| {
+                        DataFusionError::Execution(format!(
+                            "Failed to create HDFS operator for '{}' 
(namenode: {}): {}",
+                            output_file_path, namenode, e
+                        ))
+                    })?
+                    .finish();
+
+                // HDFS writer will be created lazily on first write
+                // Use only the path part for the HDFS writer
+                Ok(ParquetWriter::Remote(
+                    arrow_parquet_buffer_writer,
+                    None,
+                    op,
+                    hdfs_path,
+                ))
+            }
+            "local" => {
+                // For a local file system, write directly to file
+                // Strip file:// or file: prefix if present
+                let local_path = output_file_path
+                    .strip_prefix("file://")
+                    .or_else(|| output_file_path.strip_prefix("file:"))
+                    .unwrap_or(output_file_path);
+
+                // Extract the parent directory from the file path
+                let output_dir = 
std::path::Path::new(local_path).parent().ok_or_else(|| {
+                    DataFusionError::Execution(format!(
+                        "Failed to extract parent directory from path '{}'",
+                        local_path
+                    ))
+                })?;
+
+                // Create the parent directory if it doesn't exist
+                std::fs::create_dir_all(output_dir).map_err(|e| {
+                    DataFusionError::Execution(format!(
+                        "Failed to create output directory '{}': {}",
+                        output_dir.display(),
+                        e
+                    ))
+                })?;
+
+                let file = File::create(local_path).map_err(|e| {
+                    DataFusionError::Execution(format!(
+                        "Failed to create output file '{}': {}",
+                        local_path, e
+                    ))
+                })?;
+
+                let writer = ArrowWriter::try_new(file, schema, 
Some(props)).map_err(|e| {
+                    DataFusionError::Execution(format!("Failed to create local 
file writer: {}", e))
+                })?;
+                Ok(ParquetWriter::LocalFile(writer))
+            }
+            _ => Err(DataFusionError::Execution(format!(
+                "Unsupported storage scheme: {}",
+                storage_scheme
+            ))),
+        }
+    }
 }
 
 impl DisplayAs for ParquetWriterExec {
@@ -217,47 +472,23 @@ impl ExecutionPlan for ParquetWriterExec {
             .collect();
         let output_schema = Arc::new(arrow::datatypes::Schema::new(fields));
 
-        // Strip file:// or file: prefix if present
-        let local_path = work_dir
-            .strip_prefix("file://")
-            .or_else(|| work_dir.strip_prefix("file:"))
-            .unwrap_or(&work_dir)
-            .to_string();
-
-        // Create output directory
-        std::fs::create_dir_all(&local_path).map_err(|e| {
-            DataFusionError::Execution(format!(
-                "Failed to create output directory '{}': {}",
-                local_path, e
-            ))
-        })?;
-
         // Generate part file name for this partition
         // If using FileCommitProtocol (work_dir is set), include 
task_attempt_id in the filename
         let part_file = if let Some(attempt_id) = task_attempt_id {
             format!(
                 "{}/part-{:05}-{:05}.parquet",
-                local_path, self.partition_id, attempt_id
+                work_dir, self.partition_id, attempt_id
             )
         } else {
-            format!("{}/part-{:05}.parquet", local_path, self.partition_id)
+            format!("{}/part-{:05}.parquet", work_dir, self.partition_id)
         };
 
-        // Create the Parquet file
-        let file = File::create(&part_file).map_err(|e| {
-            DataFusionError::Execution(format!(
-                "Failed to create output file '{}': {}",
-                part_file, e
-            ))
-        })?;
-
         // Configure writer properties
         let props = WriterProperties::builder()
             .set_compression(compression)
             .build();
 
-        let mut writer = ArrowWriter::try_new(file, 
Arc::clone(&output_schema), Some(props))
-            .map_err(|e| DataFusionError::Execution(format!("Failed to create 
writer: {}", e)))?;
+        let mut writer = Self::create_arrow_writer(&part_file, 
Arc::clone(&output_schema), props)?;
 
         // Clone schema for use in async closure
         let schema_for_write = Arc::clone(&output_schema);
@@ -286,12 +517,12 @@ impl ExecutionPlan for ParquetWriterExec {
                     batch
                 };
 
-                writer.write(&renamed_batch).map_err(|e| {
+                writer.write(&renamed_batch).await.map_err(|e| {
                     DataFusionError::Execution(format!("Failed to write batch: 
{}", e))
                 })?;
             }
 
-            writer.close().map_err(|e| {
+            writer.close().await.map_err(|e| {
                 DataFusionError::Execution(format!("Failed to close writer: 
{}", e))
             })?;
 
@@ -322,3 +553,274 @@ impl ExecutionPlan for ParquetWriterExec {
         )))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Int32Array, StringArray};
+    use arrow::datatypes::{DataType, Field, Schema};
+    use std::sync::Arc;
+
+    /// Helper function to create a test RecordBatch with 1000 rows of (int, 
string) data
+    /// Example batch_id 1 -> 0..1000, 2 -> 1001..2000
+    fn create_test_record_batch(batch_id: i32) -> Result<RecordBatch> {
+        assert!(batch_id > 0, "batch_id must be greater than 0");
+        let num_rows = batch_id * 1000;
+
+        let int_array = Int32Array::from_iter_values(((batch_id - 1) * 
1000)..num_rows);
+
+        let string_values: Vec<String> = (((batch_id - 1) * 1000)..num_rows)
+            .map(|i| format!("value_{}", i))
+            .collect();
+        let string_array = StringArray::from(string_values);
+
+        // Define schema
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+        ]));
+
+        // Create RecordBatch
+        RecordBatch::try_new(schema, vec![Arc::new(int_array), 
Arc::new(string_array)])
+            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "hdfs-opendal")]
+    #[ignore = "This test requires a running HDFS cluster"]
+    async fn test_write_to_hdfs_sync() -> Result<()> {
+        use opendal::services::Hdfs;
+        use opendal::Operator;
+
+        // Configure HDFS connection
+        let namenode = "hdfs://namenode:9000";
+        let output_path = "/user/test_write/data.parquet";
+
+        // Create OpenDAL HDFS operator
+        let builder = Hdfs::default().name_node(namenode);
+        let op = Operator::new(builder)
+            .map_err(|e| {
+                DataFusionError::Execution(format!("Failed to create HDFS 
operator: {}", e))
+            })?
+            .finish();
+
+        let mut hdfs_writer = op.writer(output_path).await.map_err(|e| {
+            DataFusionError::Execution(format!("Failed to create HDFS writer: 
{}", e))
+        })?;
+
+        let mut buffer = Cursor::new(Vec::new());
+        let mut writer =
+            ArrowWriter::try_new(&mut buffer, 
create_test_record_batch(1)?.schema(), None)?;
+
+        for i in 1..=5 {
+            let record_batch = create_test_record_batch(i)?;
+
+            writer.write(&record_batch)?;
+
+            println!(
+                "Successfully wrote 1000 rows to HDFS at {}{}",
+                namenode, output_path
+            );
+        }
+
+        writer.close()?;
+
+        hdfs_writer.write(buffer.into_inner()).await.map_err(|e| {
+            DataFusionError::Execution(format!("Failed to write with HDFS 
writer: {}", e))
+        })?;
+
+        hdfs_writer.close().await.map_err(|e| {
+            DataFusionError::Execution(format!("Failed to close HDFS writer: 
{}", e))
+        })?;
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "hdfs-opendal")]
+    #[ignore = "This test requires a running HDFS cluster"]
+    async fn test_write_to_hdfs_streaming() -> Result<()> {
+        use opendal::services::Hdfs;
+        use opendal::Operator;
+
+        // Configure HDFS connection
+        let namenode = "hdfs://namenode:9000";
+        let output_path = "/user/test_write_streaming/data.parquet";
+
+        // Create OpenDAL HDFS operator
+        let builder = Hdfs::default().name_node(namenode);
+        let op = Operator::new(builder)
+            .map_err(|e| {
+                DataFusionError::Execution(format!("Failed to create HDFS 
operator: {}", e))
+            })?
+            .finish();
+
+        // Create a single HDFS writer for the entire file
+        let mut hdfs_writer = op.writer(output_path).await.map_err(|e| {
+            DataFusionError::Execution(format!("Failed to create HDFS writer: 
{}", e))
+        })?;
+
+        // Create a single ArrowWriter that will be used for all batches
+        let buffer = Cursor::new(Vec::new());
+        let mut writer = ArrowWriter::try_new(buffer, 
create_test_record_batch(1)?.schema(), None)?;
+
+        // Write each batch and upload to HDFS immediately (streaming approach)
+        for i in 1..=5 {
+            let record_batch = create_test_record_batch(i)?;
+
+            // Write the batch to the parquet writer
+            writer.write(&record_batch)?;
+
+            // Flush the writer to ensure data is written to the buffer
+            writer.flush()?;
+
+            // Get the current buffer content through the writer
+            let cursor = writer.inner_mut();
+            let current_data = cursor.get_ref().clone();
+
+            // Write the accumulated data to HDFS
+            hdfs_writer.write(current_data).await.map_err(|e| {
+                DataFusionError::Execution(format!("Failed to write batch {} 
to HDFS: {}", i, e))
+            })?;
+
+            // Clear the buffer for the next iteration
+            cursor.get_mut().clear();
+            cursor.set_position(0);
+
+            println!(
+                "Successfully streamed batch {} (1000 rows) to HDFS at {}{}",
+                i, namenode, output_path
+            );
+        }
+
+        // Close the ArrowWriter to finalize the parquet file
+        let cursor = writer.into_inner()?;
+
+        // Write any remaining data from closing the writer
+        let final_data = cursor.into_inner();
+        if !final_data.is_empty() {
+            hdfs_writer.write(final_data).await.map_err(|e| {
+                DataFusionError::Execution(format!("Failed to write final data 
to HDFS: {}", e))
+            })?;
+        }
+
+        // Close the HDFS writer
+        hdfs_writer.close().await.map_err(|e| {
+            DataFusionError::Execution(format!("Failed to close HDFS writer: 
{}", e))
+        })?;
+
+        println!(
+            "Successfully completed streaming write of 5 batches (5000 total 
rows) to HDFS at {}{}",
+            namenode, output_path
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "hdfs-opendal")]
+    #[ignore = "This test requires a running HDFS cluster"]
+    async fn test_parquet_writer_streaming() -> Result<()> {
+        // Configure output path
+        let output_path = "/user/test_parquet_writer_streaming/data.parquet";
+
+        // Configure writer properties
+        let props = WriterProperties::builder()
+            .set_compression(Compression::UNCOMPRESSED)
+            .build();
+
+        // Create ParquetWriter using the create_arrow_writer method
+        // Use full HDFS URL format
+        let full_output_path = format!("hdfs://namenode:9000{}", output_path);
+        let mut writer = ParquetWriterExec::create_arrow_writer(
+            &full_output_path,
+            create_test_record_batch(1)?.schema(),
+            props,
+        )?;
+
+        // Write 5 batches in a loop
+        for i in 1..=5 {
+            let record_batch = create_test_record_batch(i)?;
+
+            writer.write(&record_batch).await.map_err(|e| {
+                DataFusionError::Execution(format!("Failed to write batch {}: 
{}", i, e))
+            })?;
+
+            println!(
+                "Successfully wrote batch {} (1000 rows) using ParquetWriter",
+                i
+            );
+        }
+
+        // Close the writer
+        writer
+            .close()
+            .await
+            .map_err(|e| DataFusionError::Execution(format!("Failed to close 
writer: {}", e)))?;
+
+        println!(
+            "Successfully completed ParquetWriter streaming write of 5 batches 
(5000 total rows) to HDFS at {}",
+            output_path
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "hdfs-opendal")]
+    #[ignore = "This test requires a running HDFS cluster"]
+    async fn test_parquet_writer_exec_with_memory_input() -> Result<()> {
+        use datafusion::datasource::memory::MemorySourceConfig;
+        use datafusion::datasource::source::DataSourceExec;
+        use datafusion::prelude::SessionContext;
+
+        // Create 5 batches for the DataSourceExec input
+        let mut batches = Vec::new();
+        for i in 1..=5 {
+            batches.push(create_test_record_batch(i)?);
+        }
+
+        // Get schema from the first batch
+        let schema = batches[0].schema();
+
+        // Create DataSourceExec with MemorySourceConfig containing the 5 
batches as a single partition
+        let partitions = vec![batches];
+        let memory_source_config = MemorySourceConfig::try_new(&partitions, 
schema, None)?;
+        let memory_exec = 
Arc::new(DataSourceExec::new(Arc::new(memory_source_config)));
+
+        // Create ParquetWriterExec with DataSourceExec as input
+        let output_path = "unused".to_string();
+        let work_dir = 
"hdfs://namenode:9000/user/test_parquet_writer_exec".to_string();
+        let column_names = vec!["id".to_string(), "name".to_string()];
+
+        let parquet_writer = ParquetWriterExec::try_new(
+            memory_exec,
+            output_path,
+            work_dir,
+            None,      // job_id
+            Some(123), // task_attempt_id
+            CompressionCodec::None,
+            0, // partition_id
+            column_names,
+        )?;
+
+        // Create a session context and execute the plan
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+
+        // Execute partition 0
+        let mut stream = parquet_writer.execute(0, task_ctx)?;
+
+        // Consume the stream (this triggers the write)
+        while let Some(batch_result) = stream.try_next().await? {
+            // The stream should be empty as ParquetWriterExec returns empty 
batches
+            assert_eq!(batch_result.num_rows(), 0);
+        }
+
+        println!(
+            "Successfully completed ParquetWriterExec test with DataSourceExec 
input (5 batches, 5000 total rows)"
+        );
+
+        Ok(())
+    }
+}
diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs
index 10ecefad5..2b883bd7d 100644
--- a/native/core/src/lib.rs
+++ b/native/core/src/lib.rs
@@ -39,10 +39,17 @@ use log4rs::{
 };
 use once_cell::sync::OnceCell;
 
-#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
+#[cfg(all(
+    not(target_env = "msvc"),
+    feature = "jemalloc",
+    not(feature = "mimalloc")
+))]
 use tikv_jemallocator::Jemalloc;
 
-#[cfg(feature = "mimalloc")]
+#[cfg(all(
+    feature = "mimalloc",
+    not(all(not(target_env = "msvc"), feature = "jemalloc"))
+))]
 use mimalloc::MiMalloc;
 
 use errors::{try_unwrap_or_throw, CometError, CometResult};
@@ -55,11 +62,18 @@ pub mod execution;
 mod jvm_bridge;
 pub mod parquet;
 
-#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
+#[cfg(all(
+    not(target_env = "msvc"),
+    feature = "jemalloc",
+    not(feature = "mimalloc")
+))]
 #[global_allocator]
 static GLOBAL: Jemalloc = Jemalloc;
 
-#[cfg(feature = "mimalloc")]
+#[cfg(all(
+    feature = "mimalloc",
+    not(all(not(target_env = "msvc"), feature = "jemalloc"))
+))]
 #[global_allocator]
 static GLOBAL: MiMalloc = MiMalloc;
 
diff --git a/native/core/src/parquet/parquet_support.rs 
b/native/core/src/parquet/parquet_support.rs
index 0b5c45d24..c9a27d7dc 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -369,9 +369,11 @@ fn is_hdfs_scheme(url: &Url, object_store_configs: 
&HashMap<String, String>) ->
     }
 }
 
-// Mirrors object_store::parse::parse_url for the hdfs object store
-#[cfg(feature = "hdfs")]
-fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), 
object_store::Error> {
+// Creates an HDFS object store from a URL using the native HDFS implementation
+#[cfg(all(feature = "hdfs", not(feature = "hdfs-opendal")))]
+fn create_hdfs_object_store(
+    url: &Url,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
     match 
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
     {
         Some(object_store) => {
@@ -385,8 +387,11 @@ fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn 
ObjectStore>, Path), object_stor
     }
 }
 
+// Creates an HDFS object store from a URL using OpenDAL
 #[cfg(feature = "hdfs-opendal")]
-fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), 
object_store::Error> {
+fn create_hdfs_object_store(
+    url: &Url,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
     let name_node = get_name_node_uri(url)?;
     let builder = opendal::services::Hdfs::default().name_node(&name_node);
 
@@ -422,8 +427,11 @@ fn get_name_node_uri(url: &Url) -> Result<String, 
object_store::Error> {
     }
 }
 
+// Stub implementation when HDFS support is not enabled
 #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
-fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), 
object_store::Error> {
+fn create_hdfs_object_store(
+    _url: &Url,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
     Err(object_store::Error::Generic {
         store: "HadoopFileSystem",
         source: "Hdfs support is not enabled in this build".into(),
@@ -454,7 +462,7 @@ pub(crate) fn prepare_object_store_with_configs(
     );
 
     let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if 
is_hdfs_scheme {
-        parse_hdfs_url(&url)
+        create_hdfs_object_store(&url)
     } else if scheme == "s3" {
         objectstore::s3::create_store(&url, object_store_configs, 
Duration::from_secs(300))
     } else {
@@ -469,24 +477,59 @@ pub(crate) fn prepare_object_store_with_configs(
 
 #[cfg(test)]
 mod tests {
-    use crate::execution::operators::ExecutionError;
-    use crate::parquet::parquet_support::prepare_object_store_with_configs;
+    #[cfg(any(
+        all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+        feature = "hdfs"
+    ))]
     use datafusion::execution::object_store::ObjectStoreUrl;
+    #[cfg(any(
+        all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+        feature = "hdfs"
+    ))]
     use datafusion::execution::runtime_env::RuntimeEnv;
+    #[cfg(any(
+        all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+        feature = "hdfs"
+    ))]
     use object_store::path::Path;
-    use std::collections::HashMap;
+    #[cfg(any(
+        all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+        feature = "hdfs"
+    ))]
     use std::sync::Arc;
+    #[cfg(any(
+        all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+        feature = "hdfs"
+    ))]
     use url::Url;
 
+    #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
+    use crate::execution::operators::ExecutionError;
+    #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
+    use std::collections::HashMap;
+
     /// Parses the url, registers the object store, and returns a tuple of the 
object store url and object store path
+    #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
     pub(crate) fn prepare_object_store(
         runtime_env: Arc<RuntimeEnv>,
         url: String,
     ) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
+        use crate::parquet::parquet_support::prepare_object_store_with_configs;
+        prepare_object_store_with_configs(runtime_env, url, &HashMap::new())
+    }
+
+    /// Parses the url, registers the object store, and returns a tuple of the 
object store url and object store path
+    #[cfg(feature = "hdfs")]
+    pub(crate) fn prepare_object_store(
+        runtime_env: Arc<RuntimeEnv>,
+        url: String,
+    ) -> Result<(ObjectStoreUrl, Path), 
crate::execution::operators::ExecutionError> {
+        use crate::parquet::parquet_support::prepare_object_store_with_configs;
+        use std::collections::HashMap;
         prepare_object_store_with_configs(runtime_env, url, &HashMap::new())
     }
 
-    #[cfg(not(feature = "hdfs"))]
+    #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
     #[test]
     fn test_prepare_object_store() {
         use crate::execution::operators::ExecutionError;
diff --git a/native/fs-hdfs/Cargo.toml b/native/fs-hdfs/Cargo.toml
index 012f8014a..d2aeb44a2 100644
--- a/native/fs-hdfs/Cargo.toml
+++ b/native/fs-hdfs/Cargo.toml
@@ -32,7 +32,7 @@ publish = false
 readme = "README.md"
 
 [lib]
-name = "hdfs"
+name = "fs_hdfs"
 path = "src/lib.rs"
 
 [features]
diff --git a/native/hdfs/Cargo.toml b/native/hdfs/Cargo.toml
index dc8f970ef..c66d71c20 100644
--- a/native/hdfs/Cargo.toml
+++ b/native/hdfs/Cargo.toml
@@ -33,7 +33,7 @@ edition = { workspace = true }
 
 [features]
 default = ["hdfs", "try_spawn_blocking"]
-hdfs = ["fs-hdfs"]
+hdfs = ["fs_hdfs"]
 hdfs3 = ["fs-hdfs3"]
 # Used for trying to spawn a blocking thread for implementing each object 
store interface when running in a tokio runtime
 try_spawn_blocking = []
@@ -42,7 +42,7 @@ try_spawn_blocking = []
 async-trait = { workspace = true }
 bytes = { workspace = true }
 chrono = { workspace = true }
-fs-hdfs = { version = "^0.1.12", optional = true }
+fs_hdfs = { package = "datafusion-comet-fs-hdfs3", path = "../fs-hdfs", 
optional = true }
 fs-hdfs3 = { version = "^0.1.12", optional = true }
 futures = { workspace = true }
 object_store = { workspace = true }
diff --git a/native/hdfs/src/object_store/hdfs.rs 
b/native/hdfs/src/object_store/hdfs.rs
index b49e87942..a93774cff 100644
--- a/native/hdfs/src/object_store/hdfs.rs
+++ b/native/hdfs/src/object_store/hdfs.rs
@@ -26,9 +26,9 @@ use std::sync::Arc;
 use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
+use fs_hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, 
HdfsFs};
+use fs_hdfs::walkdir::HdfsWalkDir;
 use futures::{stream::BoxStream, StreamExt, TryStreamExt};
-use hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs};
-use hdfs::walkdir::HdfsWalkDir;
 use object_store::{
     path::{self, Path},
     Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, 
MultipartUpload,
@@ -422,7 +422,7 @@ impl ObjectStore for HadoopFileSystem {
                 hdfs.delete(&to, false).map_err(to_error)?;
             }
 
-            hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), 
&to)
+            fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), 
&to)
                 .map_err(to_error)?;
 
             Ok(())
@@ -437,7 +437,7 @@ impl ObjectStore for HadoopFileSystem {
         let to = HadoopFileSystem::path_to_filesystem(to);
 
         maybe_spawn_blocking(move || {
-            hdfs.rename(&from, &to).map_err(to_error)?;
+            hdfs.rename(&from, &to, true).map_err(to_error)?;
 
             Ok(())
         })
@@ -459,7 +459,7 @@ impl ObjectStore for HadoopFileSystem {
                 });
             }
 
-            hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), 
&to)
+            fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), 
&to)
                 .map_err(to_error)?;
 
             Ok(())
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
index 7fdf05521..834932984 100644
--- 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
+++ 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
@@ -52,8 +52,9 @@ object CometDataWritingCommand extends 
CometOperatorSerde[DataWritingCommandExec
       case cmd: InsertIntoHadoopFsRelationCommand =>
         cmd.fileFormat match {
           case _: ParquetFileFormat =>
-            if (!cmd.outputPath.toString.startsWith("file:")) {
-              return Unsupported(Some("Only local filesystem output paths are 
supported"))
+            if (!cmd.outputPath.toString.startsWith("file:") && 
!cmd.outputPath.toString
+                .startsWith("hdfs:")) {
+              return Unsupported(Some("Only HDFS/local filesystems output 
paths are supported"))
             }
 
             if (cmd.bucketSpec.isDefined) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to