This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new aefb3b684 feat: Add support for remote Parquet HDFS writer with
openDAL (#2929)
aefb3b684 is described below
commit aefb3b68400bd9e483e2a9e15e890f4032aff77c
Author: Oleks V <[email protected]>
AuthorDate: Mon Dec 29 12:36:27 2025 -0800
feat: Add support for remote Parquet HDFS writer with openDAL (#2929)
---
.github/actions/rust-test/action.yaml | 2 +
native/Cargo.lock | 68 +--
native/core/Cargo.toml | 2 +-
.../core/src/execution/operators/parquet_writer.rs | 560 +++++++++++++++++++--
native/core/src/lib.rs | 22 +-
native/core/src/parquet/parquet_support.rs | 63 ++-
native/fs-hdfs/Cargo.toml | 2 +-
native/hdfs/Cargo.toml | 4 +-
native/hdfs/src/object_store/hdfs.rs | 10 +-
.../serde/operator/CometDataWritingCommand.scala | 5 +-
10 files changed, 643 insertions(+), 95 deletions(-)
diff --git a/.github/actions/rust-test/action.yaml
b/.github/actions/rust-test/action.yaml
index 4c9b13a17..10fc1375f 100644
--- a/.github/actions/rust-test/action.yaml
+++ b/.github/actions/rust-test/action.yaml
@@ -70,5 +70,7 @@ runs:
shell: bash
run: |
cd native
+ # Set LD_LIBRARY_PATH to include JVM library path for tests that use
JNI
+ export LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH}
RUST_BACKTRACE=1 cargo nextest run
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 4d97ee350..dd908ce46 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -598,9 +598,9 @@ dependencies = [
[[package]]
name = "aws-lc-rs"
-version = "1.15.1"
+version = "1.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b5ce75405893cd713f9ab8e297d8e438f624dde7d706108285f7e17a25a180f"
+checksum = "6a88aab2464f1f25453baa7a07c84c5b7684e274054ba06817f382357f77a288"
dependencies = [
"aws-lc-sys",
"zeroize",
@@ -608,9 +608,9 @@ dependencies = [
[[package]]
name = "aws-lc-sys"
-version = "0.34.0"
+version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "179c3777a8b5e70e90ea426114ffc565b2c1a9f82f6c4a0c5a34aa6ef5e781b6"
+checksum = "b45afffdee1e7c9126814751f88dddc747f41d91da16c9551a0f1e8a11e788a1"
dependencies = [
"cc",
"cmake",
@@ -789,9 +789,9 @@ dependencies = [
[[package]]
name = "aws-smithy-json"
-version = "0.61.8"
+version = "0.61.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a6864c190cbb8e30cf4b77b2c8f3b6dfffa697a09b7218d2f7cd3d4c4065a9f7"
+checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551"
dependencies = [
"aws-smithy-types",
]
@@ -817,9 +817,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
-version = "1.9.5"
+version = "1.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5"
+checksum = "65fda37911905ea4d3141a01364bc5509a0f32ae3f3b22d6e330c0abfb62d247"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@@ -1145,9 +1145,9 @@ dependencies = [
[[package]]
name = "bumpalo"
-version = "3.19.0"
+version = "3.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
+checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510"
[[package]]
name = "bytecheck"
@@ -1361,9 +1361,9 @@ checksum =
"a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]]
name = "cmake"
-version = "0.1.54"
+version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0"
+checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d"
dependencies = [
"cc",
]
@@ -1852,7 +1852,7 @@ dependencies = [
"async-trait",
"bytes",
"chrono",
- "fs-hdfs",
+ "datafusion-comet-fs-hdfs3",
"fs-hdfs3",
"futures",
"object_store",
@@ -2728,20 +2728,6 @@ dependencies = [
"percent-encoding",
]
-[[package]]
-name = "fs-hdfs"
-version = "0.1.12"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "25f164ff6334da016dffd1c29a3c05b81c35b857ef829d3fa9e58ae8d3e6f76b"
-dependencies = [
- "bindgen 0.64.0",
- "cc",
- "lazy_static",
- "libc",
- "log",
- "url",
-]
-
[[package]]
name = "fs-hdfs3"
version = "0.1.12"
@@ -5005,9 +4991,9 @@ dependencies = [
[[package]]
name = "roaring"
-version = "0.11.2"
+version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f08d6a905edb32d74a5d5737a0c9d7e950c312f3c46cb0ca0a2ca09ea11878a0"
+checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885"
dependencies = [
"bytemuck",
"byteorder",
@@ -5159,9 +5145,9 @@ dependencies = [
[[package]]
name = "rustls-pki-types"
-version = "1.13.1"
+version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c"
+checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282"
dependencies = [
"web-time",
"zeroize",
@@ -5878,18 +5864,18 @@ dependencies = [
[[package]]
name = "toml_datetime"
-version = "0.7.3"
+version = "0.7.5+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533"
+checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347"
dependencies = [
"serde_core",
]
[[package]]
name = "toml_edit"
-version = "0.23.9"
+version = "0.23.10+spec-1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832"
+checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269"
dependencies = [
"indexmap 2.12.1",
"toml_datetime",
@@ -5899,9 +5885,9 @@ dependencies = [
[[package]]
name = "toml_parser"
-version = "1.0.4"
+version = "1.0.6+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e"
+checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44"
dependencies = [
"winnow",
]
@@ -5953,9 +5939,9 @@ checksum =
"8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
-version = "0.1.43"
+version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647"
+checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"pin-project-lite",
"tracing-attributes",
@@ -5975,9 +5961,9 @@ dependencies = [
[[package]]
name = "tracing-core"
-version = "0.1.35"
+version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c"
+checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [
"once_cell",
]
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 0663a118d..7b32be36a 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -99,7 +99,7 @@ datafusion-functions-nested = { version = "51.0.0" }
[features]
backtrace = ["datafusion/backtrace"]
-default = []
+default = ["hdfs-opendal"]
hdfs = ["datafusion-comet-objectstore-hdfs"]
hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"]
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
diff --git a/native/core/src/execution/operators/parquet_writer.rs
b/native/core/src/execution/operators/parquet_writer.rs
index 57246abf7..2ca1e9cfd 100644
--- a/native/core/src/execution/operators/parquet_writer.rs
+++ b/native/core/src/execution/operators/parquet_writer.rs
@@ -22,9 +22,13 @@ use std::{
fmt,
fmt::{Debug, Formatter},
fs::File,
+ io::Cursor,
sync::Arc,
};
+use opendal::{services::Hdfs, Operator};
+use url::Url;
+
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
@@ -49,6 +53,134 @@ use parquet::{
use crate::execution::shuffle::CompressionCodec;
+/// Enum representing different types of Arrow writers based on storage backend
+enum ParquetWriter {
+ /// Writer for local file system
+ LocalFile(ArrowWriter<File>),
+ /// Writer for HDFS or other remote storage (writes to in-memory buffer)
+ /// Contains the arrow writer, HDFS operator, and destination path
+ /// an Arrow writer writes to in-memory buffer the data converted to
Parquet format
+ /// The opendal::Writer is created lazily on first write
+ Remote(
+ ArrowWriter<Cursor<Vec<u8>>>,
+ Option<opendal::Writer>,
+ Operator,
+ String,
+ ),
+}
+
+impl ParquetWriter {
+ /// Write a RecordBatch to the underlying writer
+ async fn write(
+ &mut self,
+ batch: &RecordBatch,
+ ) -> std::result::Result<(), parquet::errors::ParquetError> {
+ match self {
+ ParquetWriter::LocalFile(writer) => writer.write(batch),
+ ParquetWriter::Remote(
+ arrow_parquet_buffer_writer,
+ hdfs_writer_opt,
+ op,
+ output_path,
+ ) => {
+ // Write batch to in-memory buffer
+ arrow_parquet_buffer_writer.write(batch)?;
+
+ // Flush and get the current buffer content
+ arrow_parquet_buffer_writer.flush()?;
+ let cursor = arrow_parquet_buffer_writer.inner_mut();
+ let current_data = cursor.get_ref().clone();
+
+ // Create HDFS writer lazily on first write
+ if hdfs_writer_opt.is_none() {
+ let writer =
op.writer(output_path.as_str()).await.map_err(|e| {
+ parquet::errors::ParquetError::External(
+ format!("Failed to create HDFS writer for '{}':
{}", output_path, e)
+ .into(),
+ )
+ })?;
+ *hdfs_writer_opt = Some(writer);
+ }
+
+ // Write the accumulated data to HDFS
+ if let Some(hdfs_writer) = hdfs_writer_opt {
+ hdfs_writer.write(current_data).await.map_err(|e| {
+ parquet::errors::ParquetError::External(
+ format!(
+ "Failed to write batch to HDFS file '{}': {}",
+ output_path, e
+ )
+ .into(),
+ )
+ })?;
+ }
+
+ // Clear the buffer after upload
+ cursor.get_mut().clear();
+ cursor.set_position(0);
+
+ Ok(())
+ }
+ }
+ }
+
+ /// Close the writer and finalize the file
+ async fn close(self) -> std::result::Result<(),
parquet::errors::ParquetError> {
+ match self {
+ ParquetWriter::LocalFile(writer) => {
+ writer.close()?;
+ Ok(())
+ }
+ ParquetWriter::Remote(
+ arrow_parquet_buffer_writer,
+ mut hdfs_writer_opt,
+ op,
+ output_path,
+ ) => {
+ // Close the arrow writer to finalize parquet format
+ let cursor = arrow_parquet_buffer_writer.into_inner()?;
+ let final_data = cursor.into_inner();
+
+ // Create HDFS writer if not already created
+ if hdfs_writer_opt.is_none() && !final_data.is_empty() {
+ let writer =
op.writer(output_path.as_str()).await.map_err(|e| {
+ parquet::errors::ParquetError::External(
+ format!("Failed to create HDFS writer for '{}':
{}", output_path, e)
+ .into(),
+ )
+ })?;
+ hdfs_writer_opt = Some(writer);
+ }
+
+ // Write any remaining data
+ if !final_data.is_empty() {
+ if let Some(mut hdfs_writer) = hdfs_writer_opt {
+ hdfs_writer.write(final_data).await.map_err(|e| {
+ parquet::errors::ParquetError::External(
+ format!(
+ "Failed to write final data to HDFS file
'{}': {}",
+ output_path, e
+ )
+ .into(),
+ )
+ })?;
+
+ // Close the HDFS writer
+ hdfs_writer.close().await.map_err(|e| {
+ parquet::errors::ParquetError::External(
+ format!("Failed to close HDFS writer for '{}':
{}", output_path, e)
+ .into(),
+ )
+ })?;
+ }
+ }
+
+ Ok(())
+ }
+ }
+ }
+}
+
/// Parquet writer operator that writes input batches to a Parquet file
#[derive(Debug)]
pub struct ParquetWriterExec {
@@ -119,6 +251,129 @@ impl ParquetWriterExec {
CompressionCodec::Snappy => Ok(Compression::SNAPPY),
}
}
+
+ /// Create an Arrow writer based on the storage scheme
+ ///
+ /// # Arguments
+ /// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local")
+ /// * `output_file_path` - The full path to the output file
+ /// * `schema` - The Arrow schema for the Parquet file
+ /// * `props` - Writer properties including compression
+ ///
+ /// # Returns
+ /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme
+ /// * `Err(DataFusionError)` - If writer creation fails
+ fn create_arrow_writer(
+ output_file_path: &str,
+ schema: SchemaRef,
+ props: WriterProperties,
+ ) -> Result<ParquetWriter> {
+ // Determine storage scheme from output_file_path
+ let storage_scheme = if output_file_path.starts_with("hdfs://") {
+ "hdfs"
+ } else if output_file_path.starts_with("s3://") ||
output_file_path.starts_with("s3a://") {
+ "s3"
+ } else {
+ "local"
+ };
+
+ match storage_scheme {
+ "hdfs" => {
+ // Parse the output_file_path to extract namenode and path
+ // Expected format: hdfs://namenode:port/path/to/file
+ let url = Url::parse(output_file_path).map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to parse HDFS URL '{}': {}",
+ output_file_path, e
+ ))
+ })?;
+
+ // Extract namenode (scheme + host + port)
+ let namenode = format!(
+ "{}://{}{}",
+ url.scheme(),
+ url.host_str().unwrap_or("localhost"),
+ url.port()
+ .map(|p| format!(":{}", p))
+ .unwrap_or_else(|| ":9000".to_string())
+ );
+
+ // Extract the path (without the scheme and host)
+ let hdfs_path = url.path().to_string();
+
+ // For remote storage (HDFS, S3), write to an in-memory buffer
+ let buffer = Vec::new();
+ let cursor = Cursor::new(buffer);
+ let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor,
schema, Some(props))
+ .map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to create {} writer: {}",
+ storage_scheme, e
+ ))
+ })?;
+
+ let builder = Hdfs::default().name_node(&namenode);
+ let op = Operator::new(builder)
+ .map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to create HDFS operator for '{}'
(namenode: {}): {}",
+ output_file_path, namenode, e
+ ))
+ })?
+ .finish();
+
+ // HDFS writer will be created lazily on first write
+ // Use only the path part for the HDFS writer
+ Ok(ParquetWriter::Remote(
+ arrow_parquet_buffer_writer,
+ None,
+ op,
+ hdfs_path,
+ ))
+ }
+ "local" => {
+ // For a local file system, write directly to file
+ // Strip file:// or file: prefix if present
+ let local_path = output_file_path
+ .strip_prefix("file://")
+ .or_else(|| output_file_path.strip_prefix("file:"))
+ .unwrap_or(output_file_path);
+
+ // Extract the parent directory from the file path
+ let output_dir =
std::path::Path::new(local_path).parent().ok_or_else(|| {
+ DataFusionError::Execution(format!(
+ "Failed to extract parent directory from path '{}'",
+ local_path
+ ))
+ })?;
+
+ // Create the parent directory if it doesn't exist
+ std::fs::create_dir_all(output_dir).map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to create output directory '{}': {}",
+ output_dir.display(),
+ e
+ ))
+ })?;
+
+ let file = File::create(local_path).map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to create output file '{}': {}",
+ local_path, e
+ ))
+ })?;
+
+ let writer = ArrowWriter::try_new(file, schema,
Some(props)).map_err(|e| {
+ DataFusionError::Execution(format!("Failed to create local
file writer: {}", e))
+ })?;
+ Ok(ParquetWriter::LocalFile(writer))
+ }
+ _ => Err(DataFusionError::Execution(format!(
+ "Unsupported storage scheme: {}",
+ storage_scheme
+ ))),
+ }
+ }
}
impl DisplayAs for ParquetWriterExec {
@@ -217,47 +472,23 @@ impl ExecutionPlan for ParquetWriterExec {
.collect();
let output_schema = Arc::new(arrow::datatypes::Schema::new(fields));
- // Strip file:// or file: prefix if present
- let local_path = work_dir
- .strip_prefix("file://")
- .or_else(|| work_dir.strip_prefix("file:"))
- .unwrap_or(&work_dir)
- .to_string();
-
- // Create output directory
- std::fs::create_dir_all(&local_path).map_err(|e| {
- DataFusionError::Execution(format!(
- "Failed to create output directory '{}': {}",
- local_path, e
- ))
- })?;
-
// Generate part file name for this partition
// If using FileCommitProtocol (work_dir is set), include
task_attempt_id in the filename
let part_file = if let Some(attempt_id) = task_attempt_id {
format!(
"{}/part-{:05}-{:05}.parquet",
- local_path, self.partition_id, attempt_id
+ work_dir, self.partition_id, attempt_id
)
} else {
- format!("{}/part-{:05}.parquet", local_path, self.partition_id)
+ format!("{}/part-{:05}.parquet", work_dir, self.partition_id)
};
- // Create the Parquet file
- let file = File::create(&part_file).map_err(|e| {
- DataFusionError::Execution(format!(
- "Failed to create output file '{}': {}",
- part_file, e
- ))
- })?;
-
// Configure writer properties
let props = WriterProperties::builder()
.set_compression(compression)
.build();
- let mut writer = ArrowWriter::try_new(file,
Arc::clone(&output_schema), Some(props))
- .map_err(|e| DataFusionError::Execution(format!("Failed to create
writer: {}", e)))?;
+ let mut writer = Self::create_arrow_writer(&part_file,
Arc::clone(&output_schema), props)?;
// Clone schema for use in async closure
let schema_for_write = Arc::clone(&output_schema);
@@ -286,12 +517,12 @@ impl ExecutionPlan for ParquetWriterExec {
batch
};
- writer.write(&renamed_batch).map_err(|e| {
+ writer.write(&renamed_batch).await.map_err(|e| {
DataFusionError::Execution(format!("Failed to write batch:
{}", e))
})?;
}
- writer.close().map_err(|e| {
+ writer.close().await.map_err(|e| {
DataFusionError::Execution(format!("Failed to close writer:
{}", e))
})?;
@@ -322,3 +553,274 @@ impl ExecutionPlan for ParquetWriterExec {
)))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::{Int32Array, StringArray};
+ use arrow::datatypes::{DataType, Field, Schema};
+ use std::sync::Arc;
+
+ /// Helper function to create a test RecordBatch with 1000 rows of (int,
string) data
+ /// Example batch_id 1 -> 0..1000, 2 -> 1001..2000
+ fn create_test_record_batch(batch_id: i32) -> Result<RecordBatch> {
+ assert!(batch_id > 0, "batch_id must be greater than 0");
+ let num_rows = batch_id * 1000;
+
+ let int_array = Int32Array::from_iter_values(((batch_id - 1) *
1000)..num_rows);
+
+ let string_values: Vec<String> = (((batch_id - 1) * 1000)..num_rows)
+ .map(|i| format!("value_{}", i))
+ .collect();
+ let string_array = StringArray::from(string_values);
+
+ // Define schema
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ ]));
+
+ // Create RecordBatch
+ RecordBatch::try_new(schema, vec![Arc::new(int_array),
Arc::new(string_array)])
+ .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
+ }
+
+ #[tokio::test]
+ #[cfg(feature = "hdfs-opendal")]
+ #[ignore = "This test requires a running HDFS cluster"]
+ async fn test_write_to_hdfs_sync() -> Result<()> {
+ use opendal::services::Hdfs;
+ use opendal::Operator;
+
+ // Configure HDFS connection
+ let namenode = "hdfs://namenode:9000";
+ let output_path = "/user/test_write/data.parquet";
+
+ // Create OpenDAL HDFS operator
+ let builder = Hdfs::default().name_node(namenode);
+ let op = Operator::new(builder)
+ .map_err(|e| {
+ DataFusionError::Execution(format!("Failed to create HDFS
operator: {}", e))
+ })?
+ .finish();
+
+ let mut hdfs_writer = op.writer(output_path).await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to create HDFS writer:
{}", e))
+ })?;
+
+ let mut buffer = Cursor::new(Vec::new());
+ let mut writer =
+ ArrowWriter::try_new(&mut buffer,
create_test_record_batch(1)?.schema(), None)?;
+
+ for i in 1..=5 {
+ let record_batch = create_test_record_batch(i)?;
+
+ writer.write(&record_batch)?;
+
+ println!(
+ "Successfully wrote 1000 rows to HDFS at {}{}",
+ namenode, output_path
+ );
+ }
+
+ writer.close()?;
+
+ hdfs_writer.write(buffer.into_inner()).await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to write with HDFS
writer: {}", e))
+ })?;
+
+ hdfs_writer.close().await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to close HDFS writer:
{}", e))
+ })?;
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[cfg(feature = "hdfs-opendal")]
+ #[ignore = "This test requires a running HDFS cluster"]
+ async fn test_write_to_hdfs_streaming() -> Result<()> {
+ use opendal::services::Hdfs;
+ use opendal::Operator;
+
+ // Configure HDFS connection
+ let namenode = "hdfs://namenode:9000";
+ let output_path = "/user/test_write_streaming/data.parquet";
+
+ // Create OpenDAL HDFS operator
+ let builder = Hdfs::default().name_node(namenode);
+ let op = Operator::new(builder)
+ .map_err(|e| {
+ DataFusionError::Execution(format!("Failed to create HDFS
operator: {}", e))
+ })?
+ .finish();
+
+ // Create a single HDFS writer for the entire file
+ let mut hdfs_writer = op.writer(output_path).await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to create HDFS writer:
{}", e))
+ })?;
+
+ // Create a single ArrowWriter that will be used for all batches
+ let buffer = Cursor::new(Vec::new());
+ let mut writer = ArrowWriter::try_new(buffer,
create_test_record_batch(1)?.schema(), None)?;
+
+ // Write each batch and upload to HDFS immediately (streaming approach)
+ for i in 1..=5 {
+ let record_batch = create_test_record_batch(i)?;
+
+ // Write the batch to the parquet writer
+ writer.write(&record_batch)?;
+
+ // Flush the writer to ensure data is written to the buffer
+ writer.flush()?;
+
+ // Get the current buffer content through the writer
+ let cursor = writer.inner_mut();
+ let current_data = cursor.get_ref().clone();
+
+ // Write the accumulated data to HDFS
+ hdfs_writer.write(current_data).await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to write batch {}
to HDFS: {}", i, e))
+ })?;
+
+ // Clear the buffer for the next iteration
+ cursor.get_mut().clear();
+ cursor.set_position(0);
+
+ println!(
+ "Successfully streamed batch {} (1000 rows) to HDFS at {}{}",
+ i, namenode, output_path
+ );
+ }
+
+ // Close the ArrowWriter to finalize the parquet file
+ let cursor = writer.into_inner()?;
+
+ // Write any remaining data from closing the writer
+ let final_data = cursor.into_inner();
+ if !final_data.is_empty() {
+ hdfs_writer.write(final_data).await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to write final data
to HDFS: {}", e))
+ })?;
+ }
+
+ // Close the HDFS writer
+ hdfs_writer.close().await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to close HDFS writer:
{}", e))
+ })?;
+
+ println!(
+ "Successfully completed streaming write of 5 batches (5000 total
rows) to HDFS at {}{}",
+ namenode, output_path
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[cfg(feature = "hdfs-opendal")]
+ #[ignore = "This test requires a running HDFS cluster"]
+ async fn test_parquet_writer_streaming() -> Result<()> {
+ // Configure output path
+ let output_path = "/user/test_parquet_writer_streaming/data.parquet";
+
+ // Configure writer properties
+ let props = WriterProperties::builder()
+ .set_compression(Compression::UNCOMPRESSED)
+ .build();
+
+ // Create ParquetWriter using the create_arrow_writer method
+ // Use full HDFS URL format
+ let full_output_path = format!("hdfs://namenode:9000{}", output_path);
+ let mut writer = ParquetWriterExec::create_arrow_writer(
+ &full_output_path,
+ create_test_record_batch(1)?.schema(),
+ props,
+ )?;
+
+ // Write 5 batches in a loop
+ for i in 1..=5 {
+ let record_batch = create_test_record_batch(i)?;
+
+ writer.write(&record_batch).await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to write batch {}:
{}", i, e))
+ })?;
+
+ println!(
+ "Successfully wrote batch {} (1000 rows) using ParquetWriter",
+ i
+ );
+ }
+
+ // Close the writer
+ writer
+ .close()
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("Failed to close
writer: {}", e)))?;
+
+ println!(
+ "Successfully completed ParquetWriter streaming write of 5 batches
(5000 total rows) to HDFS at {}",
+ output_path
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ #[cfg(feature = "hdfs-opendal")]
+ #[ignore = "This test requires a running HDFS cluster"]
+ async fn test_parquet_writer_exec_with_memory_input() -> Result<()> {
+ use datafusion::datasource::memory::MemorySourceConfig;
+ use datafusion::datasource::source::DataSourceExec;
+ use datafusion::prelude::SessionContext;
+
+ // Create 5 batches for the DataSourceExec input
+ let mut batches = Vec::new();
+ for i in 1..=5 {
+ batches.push(create_test_record_batch(i)?);
+ }
+
+ // Get schema from the first batch
+ let schema = batches[0].schema();
+
+ // Create DataSourceExec with MemorySourceConfig containing the 5
batches as a single partition
+ let partitions = vec![batches];
+ let memory_source_config = MemorySourceConfig::try_new(&partitions,
schema, None)?;
+ let memory_exec =
Arc::new(DataSourceExec::new(Arc::new(memory_source_config)));
+
+ // Create ParquetWriterExec with DataSourceExec as input
+ let output_path = "unused".to_string();
+ let work_dir =
"hdfs://namenode:9000/user/test_parquet_writer_exec".to_string();
+ let column_names = vec!["id".to_string(), "name".to_string()];
+
+ let parquet_writer = ParquetWriterExec::try_new(
+ memory_exec,
+ output_path,
+ work_dir,
+ None, // job_id
+ Some(123), // task_attempt_id
+ CompressionCodec::None,
+ 0, // partition_id
+ column_names,
+ )?;
+
+ // Create a session context and execute the plan
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ // Execute partition 0
+ let mut stream = parquet_writer.execute(0, task_ctx)?;
+
+ // Consume the stream (this triggers the write)
+ while let Some(batch_result) = stream.try_next().await? {
+ // The stream should be empty as ParquetWriterExec returns empty
batches
+ assert_eq!(batch_result.num_rows(), 0);
+ }
+
+ println!(
+ "Successfully completed ParquetWriterExec test with DataSourceExec
input (5 batches, 5000 total rows)"
+ );
+
+ Ok(())
+ }
+}
diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs
index 10ecefad5..2b883bd7d 100644
--- a/native/core/src/lib.rs
+++ b/native/core/src/lib.rs
@@ -39,10 +39,17 @@ use log4rs::{
};
use once_cell::sync::OnceCell;
-#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
+#[cfg(all(
+ not(target_env = "msvc"),
+ feature = "jemalloc",
+ not(feature = "mimalloc")
+))]
use tikv_jemallocator::Jemalloc;
-#[cfg(feature = "mimalloc")]
+#[cfg(all(
+ feature = "mimalloc",
+ not(all(not(target_env = "msvc"), feature = "jemalloc"))
+))]
use mimalloc::MiMalloc;
use errors::{try_unwrap_or_throw, CometError, CometResult};
@@ -55,11 +62,18 @@ pub mod execution;
mod jvm_bridge;
pub mod parquet;
-#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
+#[cfg(all(
+ not(target_env = "msvc"),
+ feature = "jemalloc",
+ not(feature = "mimalloc")
+))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
-#[cfg(feature = "mimalloc")]
+#[cfg(all(
+ feature = "mimalloc",
+ not(all(not(target_env = "msvc"), feature = "jemalloc"))
+))]
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
diff --git a/native/core/src/parquet/parquet_support.rs
b/native/core/src/parquet/parquet_support.rs
index 0b5c45d24..c9a27d7dc 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -369,9 +369,11 @@ fn is_hdfs_scheme(url: &Url, object_store_configs:
&HashMap<String, String>) ->
}
}
-// Mirrors object_store::parse::parse_url for the hdfs object store
-#[cfg(feature = "hdfs")]
-fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path),
object_store::Error> {
+// Creates an HDFS object store from a URL using the native HDFS implementation
+#[cfg(all(feature = "hdfs", not(feature = "hdfs-opendal")))]
+fn create_hdfs_object_store(
+ url: &Url,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
match
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
{
Some(object_store) => {
@@ -385,8 +387,11 @@ fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn
ObjectStore>, Path), object_stor
}
}
+// Creates an HDFS object store from a URL using OpenDAL
#[cfg(feature = "hdfs-opendal")]
-fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path),
object_store::Error> {
+fn create_hdfs_object_store(
+ url: &Url,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
let name_node = get_name_node_uri(url)?;
let builder = opendal::services::Hdfs::default().name_node(&name_node);
@@ -422,8 +427,11 @@ fn get_name_node_uri(url: &Url) -> Result<String,
object_store::Error> {
}
}
+// Stub implementation when HDFS support is not enabled
#[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
-fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path),
object_store::Error> {
+fn create_hdfs_object_store(
+ _url: &Url,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
Err(object_store::Error::Generic {
store: "HadoopFileSystem",
source: "Hdfs support is not enabled in this build".into(),
@@ -454,7 +462,7 @@ pub(crate) fn prepare_object_store_with_configs(
);
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if
is_hdfs_scheme {
- parse_hdfs_url(&url)
+ create_hdfs_object_store(&url)
} else if scheme == "s3" {
objectstore::s3::create_store(&url, object_store_configs,
Duration::from_secs(300))
} else {
@@ -469,24 +477,59 @@ pub(crate) fn prepare_object_store_with_configs(
#[cfg(test)]
mod tests {
- use crate::execution::operators::ExecutionError;
- use crate::parquet::parquet_support::prepare_object_store_with_configs;
+ #[cfg(any(
+ all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+ feature = "hdfs"
+ ))]
use datafusion::execution::object_store::ObjectStoreUrl;
+ #[cfg(any(
+ all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+ feature = "hdfs"
+ ))]
use datafusion::execution::runtime_env::RuntimeEnv;
+ #[cfg(any(
+ all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+ feature = "hdfs"
+ ))]
use object_store::path::Path;
- use std::collections::HashMap;
+ #[cfg(any(
+ all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+ feature = "hdfs"
+ ))]
use std::sync::Arc;
+ #[cfg(any(
+ all(not(feature = "hdfs"), not(feature = "hdfs-opendal")),
+ feature = "hdfs"
+ ))]
use url::Url;
+ #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
+ use crate::execution::operators::ExecutionError;
+ #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
+ use std::collections::HashMap;
+
/// Parses the url, registers the object store, and returns a tuple of the
object store url and object store path
+ #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
pub(crate) fn prepare_object_store(
runtime_env: Arc<RuntimeEnv>,
url: String,
) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
+ use crate::parquet::parquet_support::prepare_object_store_with_configs;
+ prepare_object_store_with_configs(runtime_env, url, &HashMap::new())
+ }
+
+ /// Parses the url, registers the object store, and returns a tuple of the
object store url and object store path
+ #[cfg(feature = "hdfs")]
+ pub(crate) fn prepare_object_store(
+ runtime_env: Arc<RuntimeEnv>,
+ url: String,
+ ) -> Result<(ObjectStoreUrl, Path),
crate::execution::operators::ExecutionError> {
+ use crate::parquet::parquet_support::prepare_object_store_with_configs;
+ use std::collections::HashMap;
prepare_object_store_with_configs(runtime_env, url, &HashMap::new())
}
- #[cfg(not(feature = "hdfs"))]
+ #[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
#[test]
fn test_prepare_object_store() {
use crate::execution::operators::ExecutionError;
diff --git a/native/fs-hdfs/Cargo.toml b/native/fs-hdfs/Cargo.toml
index 012f8014a..d2aeb44a2 100644
--- a/native/fs-hdfs/Cargo.toml
+++ b/native/fs-hdfs/Cargo.toml
@@ -32,7 +32,7 @@ publish = false
readme = "README.md"
[lib]
-name = "hdfs"
+name = "fs_hdfs"
path = "src/lib.rs"
[features]
diff --git a/native/hdfs/Cargo.toml b/native/hdfs/Cargo.toml
index dc8f970ef..c66d71c20 100644
--- a/native/hdfs/Cargo.toml
+++ b/native/hdfs/Cargo.toml
@@ -33,7 +33,7 @@ edition = { workspace = true }
[features]
default = ["hdfs", "try_spawn_blocking"]
-hdfs = ["fs-hdfs"]
+hdfs = ["fs_hdfs"]
hdfs3 = ["fs-hdfs3"]
# Used for trying to spawn a blocking thread for implementing each object
store interface when running in a tokio runtime
try_spawn_blocking = []
@@ -42,7 +42,7 @@ try_spawn_blocking = []
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
-fs-hdfs = { version = "^0.1.12", optional = true }
+fs_hdfs = { package = "datafusion-comet-fs-hdfs3", path = "../fs-hdfs",
optional = true }
fs-hdfs3 = { version = "^0.1.12", optional = true }
futures = { workspace = true }
object_store = { workspace = true }
diff --git a/native/hdfs/src/object_store/hdfs.rs
b/native/hdfs/src/object_store/hdfs.rs
index b49e87942..a93774cff 100644
--- a/native/hdfs/src/object_store/hdfs.rs
+++ b/native/hdfs/src/object_store/hdfs.rs
@@ -26,9 +26,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
+use fs_hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile,
HdfsFs};
+use fs_hdfs::walkdir::HdfsWalkDir;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
-use hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs};
-use hdfs::walkdir::HdfsWalkDir;
use object_store::{
path::{self, Path},
Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult,
MultipartUpload,
@@ -422,7 +422,7 @@ impl ObjectStore for HadoopFileSystem {
hdfs.delete(&to, false).map_err(to_error)?;
}
- hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(),
&to)
+ fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(),
&to)
.map_err(to_error)?;
Ok(())
@@ -437,7 +437,7 @@ impl ObjectStore for HadoopFileSystem {
let to = HadoopFileSystem::path_to_filesystem(to);
maybe_spawn_blocking(move || {
- hdfs.rename(&from, &to).map_err(to_error)?;
+ hdfs.rename(&from, &to, true).map_err(to_error)?;
Ok(())
})
@@ -459,7 +459,7 @@ impl ObjectStore for HadoopFileSystem {
});
}
- hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(),
&to)
+ fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(),
&to)
.map_err(to_error)?;
Ok(())
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
index 7fdf05521..834932984 100644
---
a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
+++
b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
@@ -52,8 +52,9 @@ object CometDataWritingCommand extends
CometOperatorSerde[DataWritingCommandExec
case cmd: InsertIntoHadoopFsRelationCommand =>
cmd.fileFormat match {
case _: ParquetFileFormat =>
- if (!cmd.outputPath.toString.startsWith("file:")) {
- return Unsupported(Some("Only local filesystem output paths are
supported"))
+ if (!cmd.outputPath.toString.startsWith("file:") &&
!cmd.outputPath.toString
+ .startsWith("hdfs:")) {
+ return Unsupported(Some("Only HDFS/local filesystems output
paths are supported"))
}
if (cmd.bucketSpec.isDefined) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]