This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new c9fcb80a3 fix(service/hdfs): enable hdfs append support (#3600)
c9fcb80a3 is described below
commit c9fcb80a35c357aa23dc2f23cb937b7bd4f0a3fb
Author: Qingwen Zhao <[email protected]>
AuthorDate: Fri Nov 17 22:43:01 2023 +0800
fix(service/hdfs): enable hdfs append support (#3600)
* enable hdfs blocking append
* add enable_append
* change enable_append=false
* update
* update doc
---
.github/workflows/service_test_hdfs.yml | 10 +++-
core/src/services/hdfs/backend.rs | 99 +++++++++++++++++++++------------
core/src/services/hdfs/docs.md | 8 ++-
core/tests/behavior/append.rs | 3 +
fixtures/hdfs/hdfs-site.xml | 27 +++++++++
5 files changed, 108 insertions(+), 39 deletions(-)
diff --git a/.github/workflows/service_test_hdfs.yml
b/.github/workflows/service_test_hdfs.yml
index 05df53510..8f9a26b8b 100644
--- a/.github/workflows/service_test_hdfs.yml
+++ b/.github/workflows/service_test_hdfs.yml
@@ -63,13 +63,15 @@ jobs:
run: |
export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)
export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{
env.HADOOP_HOME }}/lib/native
-
+ cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{
env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml
+
cargo test behavior --features tests,services-hdfs
env:
HADOOP_HOME: "/home/runner/hadoop-3.3.5"
OPENDAL_TEST: hdfs
OPENDAL_HDFS_ROOT: /tmp/opendal/
OPENDAL_HDFS_NAME_NODE: default
+ OPENDAL_HDFS_ENABLE_APPEND: false
hdfs-cluster:
runs-on: ubuntu-latest
@@ -87,6 +89,8 @@ jobs:
-e WEBHDFS_CONF_dfs_webhdfs_enabled=true \
-e CORE_CONF_hadoop_http_staticuser_user=root \
-e HDFS_CONF_dfs_permissions_enabled=false \
+ -e HDFS_CONF_dfs_support_append=true \
+ -e HDFS_CONF_dfs_replication=1 \
bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
docker run -d \
@@ -96,6 +100,8 @@ jobs:
-e WEBHDFS_CONF_dfs_webhdfs_enabled=true \
-e CORE_CONF_hadoop_http_staticuser_user=root \
-e HDFS_CONF_dfs_permissions_enabled=false \
+ -e HDFS_CONF_dfs_support_append=true \
+ -e HDFS_CONF_dfs_replication=1 \
bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
curl --retry 30 --retry-delay 1 --retry-connrefused
http://localhost:9870
@@ -121,6 +127,7 @@ jobs:
run: |
export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)
export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{
env.HADOOP_HOME }}/lib/native
+ cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{
env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml
cargo test behavior --features tests,services-hdfs
env:
@@ -128,3 +135,4 @@ jobs:
OPENDAL_TEST: hdfs
OPENDAL_HDFS_ROOT: /tmp/opendal/
OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020
+ OPENDAL_HDFS_ENABLE_APPEND: true
diff --git a/core/src/services/hdfs/backend.rs
b/core/src/services/hdfs/backend.rs
index f208cd000..ea857d4f6 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -22,6 +22,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
+use futures::AsyncWriteExt;
use log::debug;
use super::lister::HdfsLister;
@@ -31,12 +32,13 @@ use crate::*;
/// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/)
support.
#[doc = include_str!("docs.md")]
-#[derive(Debug, Default)]
+#[derive(Default, Debug)]
pub struct HdfsBuilder {
root: Option<String>,
name_node: Option<String>,
kerberos_ticket_cache_path: Option<String>,
user: Option<String>,
+ enable_append: bool,
}
impl HdfsBuilder {
@@ -85,6 +87,14 @@ impl HdfsBuilder {
}
self
}
+
+ /// Enable append capacity of this backend.
+ ///
+ /// This should be disabled when HDFS runs in non-distributed mode.
+ pub fn enable_append(&mut self, enable_append: bool) -> &mut Self {
+ self.enable_append = enable_append;
+ self
+ }
}
impl Builder for HdfsBuilder {
@@ -99,6 +109,9 @@ impl Builder for HdfsBuilder {
map.get("kerberos_ticket_cache_path")
.map(|v| builder.kerberos_ticket_cache_path(v));
map.get("user").map(|v| builder.user(v));
+ map.get("enable_append").map(|v| {
+ builder.enable_append(v.parse().expect("enable_append should be
true or false"))
+ });
builder
}
@@ -140,6 +153,7 @@ impl Builder for HdfsBuilder {
Ok(HdfsBackend {
root,
client: Arc::new(client),
+ enable_append: self.enable_append,
})
}
}
@@ -149,6 +163,7 @@ impl Builder for HdfsBuilder {
pub struct HdfsBackend {
root: String,
client: Arc<hdrs::Client>,
+ enable_append: bool,
}
/// hdrs::Client is thread-safe.
@@ -175,8 +190,7 @@ impl Accessor for HdfsBackend {
read_can_seek: true,
write: true,
- // TODO: wait for
https://github.com/apache/incubator-opendal/pull/2715
- write_can_append: false,
+ write_can_append: self.enable_append,
create_dir: true,
delete: true,
@@ -220,20 +234,26 @@ impl Accessor for HdfsBackend {
async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let p = build_rooted_abs_path(&self.root, path);
- let parent = PathBuf::from(&p)
- .parent()
- .ok_or_else(|| {
- Error::new(
- ErrorKind::Unexpected,
- "path should have parent but not, it must be malformed",
- )
- .with_context("input", &p)
- })?
- .to_path_buf();
+ if let Err(err) = self.client.metadata(&p) {
+ // Early return if other error happened.
+ if err.kind() != io::ErrorKind::NotFound {
+ return Err(new_std_io_error(err));
+ }
- self.client
- .create_dir(&parent.to_string_lossy())
- .map_err(new_std_io_error)?;
+ let parent = get_parent(&p);
+
+ self.client.create_dir(parent).map_err(new_std_io_error)?;
+
+ let mut f = self
+ .client
+ .open_file()
+ .create(true)
+ .write(true)
+ .async_open(&p)
+ .await
+ .map_err(new_std_io_error)?;
+ f.close().await.map_err(new_std_io_error)?;
+ }
let mut open_options = self.client.open_file();
open_options.create(true);
@@ -386,31 +406,36 @@ impl Accessor for HdfsBackend {
Ok((RpRead::new(), r))
}
- fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
let p = build_rooted_abs_path(&self.root, path);
- let parent = PathBuf::from(&p)
- .parent()
- .ok_or_else(|| {
- Error::new(
- ErrorKind::Unexpected,
- "path should have parent but not, it must be malformed",
- )
- .with_context("input", &p)
- })?
- .to_path_buf();
+ if let Err(err) = self.client.metadata(&p) {
+ // Early return if other error happened.
+ if err.kind() != io::ErrorKind::NotFound {
+ return Err(new_std_io_error(err));
+ }
- self.client
- .create_dir(&parent.to_string_lossy())
- .map_err(new_std_io_error)?;
+ let parent = get_parent(&p);
- let f = self
- .client
- .open_file()
- .create(true)
- .write(true)
- .open(&p)
- .map_err(new_std_io_error)?;
+ self.client.create_dir(parent).map_err(new_std_io_error)?;
+
+ self.client
+ .open_file()
+ .create(true)
+ .write(true)
+ .open(&p)
+ .map_err(new_std_io_error)?;
+ }
+
+ let mut open_options = self.client.open_file();
+ open_options.create(true);
+ if op.append() {
+ open_options.append(true);
+ } else {
+ open_options.write(true);
+ }
+
+ let f = open_options.open(&p).map_err(new_std_io_error)?;
Ok((RpWrite::new(), HdfsWriter::new(f)))
}
diff --git a/core/src/services/hdfs/docs.md b/core/src/services/hdfs/docs.md
index 6fb1e09b7..c9289159f 100644
--- a/core/src/services/hdfs/docs.md
+++ b/core/src/services/hdfs/docs.md
@@ -15,7 +15,7 @@ This service can be used to:
- [ ] ~~scan~~
- [ ] ~~presign~~
- [x] blocking
-- [ ] append
+- [x] append
## Differences with webhdfs
@@ -31,6 +31,7 @@ HDFS support needs to enable feature `services-hdfs`.
- `name_node`: Set the name node for backend.
- `kerberos_ticket_cache_path`: Set the kerberos ticket cache path for
backend, this should be gotten by `klist` after `kinit`
- `user`: Set the user for backend
+- `enable_append`: enable the append capacity. Default is false.
Refer to [`HdfsBuilder`]'s public API docs for more information.
@@ -125,6 +126,11 @@ async fn main() -> Result<()> {
//
// NOTE: the root must be absolute path.
builder.root("/tmp");
+
+ // Enable the append capacity for hdfs.
+ //
+ // Note: HDFS run in non-distributed mode doesn't support append.
+ builder.enable_append(true);
// `Accessor` provides the low level APIs, we will use `Operator` normally.
let op: Operator = Operator::new(builder)?.finish();
diff --git a/core/tests/behavior/append.rs b/core/tests/behavior/append.rs
index 91ed3bee3..5f535e4c1 100644
--- a/core/tests/behavior/append.rs
+++ b/core/tests/behavior/append.rs
@@ -55,6 +55,9 @@ pub async fn test_append_create_append(op: Operator) ->
Result<()> {
.await
.expect("append file first time must success");
+ let meta = op.stat(&path).await?;
+ assert_eq!(meta.content_length(), size_one as u64);
+
op.write_with(&path, content_two.clone())
.append(true)
.await
diff --git a/fixtures/hdfs/hdfs-site.xml b/fixtures/hdfs/hdfs-site.xml
new file mode 100644
index 000000000..cf83c2d57
--- /dev/null
+++ b/fixtures/hdfs/hdfs-site.xml
@@ -0,0 +1,27 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<configuration>
+ <property>
+ <name>dfs.replication</name>
+ <value>1</value>
+ </property>
+</configuration>