This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new c9fcb80a3 fix(service/hdfs): enable hdfs append support (#3600)
c9fcb80a3 is described below

commit c9fcb80a35c357aa23dc2f23cb937b7bd4f0a3fb
Author: Qingwen Zhao <[email protected]>
AuthorDate: Fri Nov 17 22:43:01 2023 +0800

    fix(service/hdfs): enable hdfs append support (#3600)
    
    * enable hdfs blocking append
    
    * add enable_append
    
    * change enable_append=false
    
    * update
    
    * update doc
---
 .github/workflows/service_test_hdfs.yml | 10 +++-
 core/src/services/hdfs/backend.rs       | 99 +++++++++++++++++++++------------
 core/src/services/hdfs/docs.md          |  8 ++-
 core/tests/behavior/append.rs           |  3 +
 fixtures/hdfs/hdfs-site.xml             | 27 +++++++++
 5 files changed, 108 insertions(+), 39 deletions(-)

diff --git a/.github/workflows/service_test_hdfs.yml 
b/.github/workflows/service_test_hdfs.yml
index 05df53510..8f9a26b8b 100644
--- a/.github/workflows/service_test_hdfs.yml
+++ b/.github/workflows/service_test_hdfs.yml
@@ -63,13 +63,15 @@ jobs:
         run: |
           export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)
           export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ 
env.HADOOP_HOME }}/lib/native
-
+          cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ 
env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml
+          
           cargo test behavior --features tests,services-hdfs
         env:
           HADOOP_HOME: "/home/runner/hadoop-3.3.5"
           OPENDAL_TEST: hdfs
           OPENDAL_HDFS_ROOT: /tmp/opendal/
           OPENDAL_HDFS_NAME_NODE: default
+          OPENDAL_HDFS_ENABLE_APPEND: false
 
   hdfs-cluster:
     runs-on: ubuntu-latest
@@ -87,6 +89,8 @@ jobs:
             -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \
             -e CORE_CONF_hadoop_http_staticuser_user=root \
             -e HDFS_CONF_dfs_permissions_enabled=false \
+            -e HDFS_CONF_dfs_support_append=true \
+            -e HDFS_CONF_dfs_replication=1 \
             bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
 
           docker run -d \
@@ -96,6 +100,8 @@ jobs:
             -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \
             -e CORE_CONF_hadoop_http_staticuser_user=root \
             -e HDFS_CONF_dfs_permissions_enabled=false \
+            -e HDFS_CONF_dfs_support_append=true \
+            -e HDFS_CONF_dfs_replication=1 \
             bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
 
           curl --retry 30 --retry-delay 1 --retry-connrefused 
http://localhost:9870
@@ -121,6 +127,7 @@ jobs:
         run: |
           export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)
           export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ 
env.HADOOP_HOME }}/lib/native
+          cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ 
env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml
 
           cargo test behavior --features tests,services-hdfs
         env:
@@ -128,3 +135,4 @@ jobs:
           OPENDAL_TEST: hdfs
           OPENDAL_HDFS_ROOT: /tmp/opendal/
           OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020
+          OPENDAL_HDFS_ENABLE_APPEND: true
diff --git a/core/src/services/hdfs/backend.rs 
b/core/src/services/hdfs/backend.rs
index f208cd000..ea857d4f6 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -22,6 +22,7 @@ use std::path::PathBuf;
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use futures::AsyncWriteExt;
 use log::debug;
 
 use super::lister::HdfsLister;
@@ -31,12 +32,13 @@ use crate::*;
 
 /// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/) 
support.
 #[doc = include_str!("docs.md")]
-#[derive(Debug, Default)]
+#[derive(Default, Debug)]
 pub struct HdfsBuilder {
     root: Option<String>,
     name_node: Option<String>,
     kerberos_ticket_cache_path: Option<String>,
     user: Option<String>,
+    enable_append: bool,
 }
 
 impl HdfsBuilder {
@@ -85,6 +87,14 @@ impl HdfsBuilder {
         }
         self
     }
+
+    /// Enable append capacity of this backend.
+    ///
+    /// This should be disabled when HDFS runs in non-distributed mode.
+    pub fn enable_append(&mut self, enable_append: bool) -> &mut Self {
+        self.enable_append = enable_append;
+        self
+    }
 }
 
 impl Builder for HdfsBuilder {
@@ -99,6 +109,9 @@ impl Builder for HdfsBuilder {
         map.get("kerberos_ticket_cache_path")
             .map(|v| builder.kerberos_ticket_cache_path(v));
         map.get("user").map(|v| builder.user(v));
+        map.get("enable_append").map(|v| {
+            builder.enable_append(v.parse().expect("enable_append should be 
true or false"))
+        });
 
         builder
     }
@@ -140,6 +153,7 @@ impl Builder for HdfsBuilder {
         Ok(HdfsBackend {
             root,
             client: Arc::new(client),
+            enable_append: self.enable_append,
         })
     }
 }
@@ -149,6 +163,7 @@ impl Builder for HdfsBuilder {
 pub struct HdfsBackend {
     root: String,
     client: Arc<hdrs::Client>,
+    enable_append: bool,
 }
 
 /// hdrs::Client is thread-safe.
@@ -175,8 +190,7 @@ impl Accessor for HdfsBackend {
                 read_can_seek: true,
 
                 write: true,
-                // TODO: wait for 
https://github.com/apache/incubator-opendal/pull/2715
-                write_can_append: false,
+                write_can_append: self.enable_append,
 
                 create_dir: true,
                 delete: true,
@@ -220,20 +234,26 @@ impl Accessor for HdfsBackend {
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let p = build_rooted_abs_path(&self.root, path);
 
-        let parent = PathBuf::from(&p)
-            .parent()
-            .ok_or_else(|| {
-                Error::new(
-                    ErrorKind::Unexpected,
-                    "path should have parent but not, it must be malformed",
-                )
-                .with_context("input", &p)
-            })?
-            .to_path_buf();
+        if let Err(err) = self.client.metadata(&p) {
+            // Early return if other error happened.
+            if err.kind() != io::ErrorKind::NotFound {
+                return Err(new_std_io_error(err));
+            }
 
-        self.client
-            .create_dir(&parent.to_string_lossy())
-            .map_err(new_std_io_error)?;
+            let parent = get_parent(&p);
+
+            self.client.create_dir(parent).map_err(new_std_io_error)?;
+
+            let mut f = self
+                .client
+                .open_file()
+                .create(true)
+                .write(true)
+                .async_open(&p)
+                .await
+                .map_err(new_std_io_error)?;
+            f.close().await.map_err(new_std_io_error)?;
+        }
 
         let mut open_options = self.client.open_file();
         open_options.create(true);
@@ -386,31 +406,36 @@ impl Accessor for HdfsBackend {
         Ok((RpRead::new(), r))
     }
 
-    fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+    fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
         let p = build_rooted_abs_path(&self.root, path);
 
-        let parent = PathBuf::from(&p)
-            .parent()
-            .ok_or_else(|| {
-                Error::new(
-                    ErrorKind::Unexpected,
-                    "path should have parent but not, it must be malformed",
-                )
-                .with_context("input", &p)
-            })?
-            .to_path_buf();
+        if let Err(err) = self.client.metadata(&p) {
+            // Early return if other error happened.
+            if err.kind() != io::ErrorKind::NotFound {
+                return Err(new_std_io_error(err));
+            }
 
-        self.client
-            .create_dir(&parent.to_string_lossy())
-            .map_err(new_std_io_error)?;
+            let parent = get_parent(&p);
 
-        let f = self
-            .client
-            .open_file()
-            .create(true)
-            .write(true)
-            .open(&p)
-            .map_err(new_std_io_error)?;
+            self.client.create_dir(parent).map_err(new_std_io_error)?;
+
+            self.client
+                .open_file()
+                .create(true)
+                .write(true)
+                .open(&p)
+                .map_err(new_std_io_error)?;
+        }
+
+        let mut open_options = self.client.open_file();
+        open_options.create(true);
+        if op.append() {
+            open_options.append(true);
+        } else {
+            open_options.write(true);
+        }
+
+        let f = open_options.open(&p).map_err(new_std_io_error)?;
 
         Ok((RpWrite::new(), HdfsWriter::new(f)))
     }
diff --git a/core/src/services/hdfs/docs.md b/core/src/services/hdfs/docs.md
index 6fb1e09b7..c9289159f 100644
--- a/core/src/services/hdfs/docs.md
+++ b/core/src/services/hdfs/docs.md
@@ -15,7 +15,7 @@ This service can be used to:
 - [ ] ~~scan~~
 - [ ] ~~presign~~
 - [x] blocking
-- [ ] append
+- [x] append
 
 ## Differences with webhdfs
 
@@ -31,6 +31,7 @@ HDFS support needs to enable feature `services-hdfs`.
 - `name_node`: Set the name node for backend.
 - `kerberos_ticket_cache_path`: Set the kerberos ticket cache path for 
backend, this should be gotten by `klist` after `kinit`
 - `user`: Set the user for backend
+- `enable_append`: enable the append capacity. Default is false. 
 
 Refer to [`HdfsBuilder`]'s public API docs for more information.
 
@@ -125,6 +126,11 @@ async fn main() -> Result<()> {
     //
     // NOTE: the root must be absolute path.
     builder.root("/tmp");
+    
+    // Enable the append capacity for hdfs. 
+    // 
+    // Note: HDFS run in non-distributed mode doesn't support append.
+    builder.enable_append(true);
 
     // `Accessor` provides the low level APIs, we will use `Operator` normally.
     let op: Operator = Operator::new(builder)?.finish();
diff --git a/core/tests/behavior/append.rs b/core/tests/behavior/append.rs
index 91ed3bee3..5f535e4c1 100644
--- a/core/tests/behavior/append.rs
+++ b/core/tests/behavior/append.rs
@@ -55,6 +55,9 @@ pub async fn test_append_create_append(op: Operator) -> 
Result<()> {
         .await
         .expect("append file first time must success");
 
+    let meta = op.stat(&path).await?;
+    assert_eq!(meta.content_length(), size_one as u64);
+
     op.write_with(&path, content_two.clone())
         .append(true)
         .await
diff --git a/fixtures/hdfs/hdfs-site.xml b/fixtures/hdfs/hdfs-site.xml
new file mode 100644
index 000000000..cf83c2d57
--- /dev/null
+++ b/fixtures/hdfs/hdfs-site.xml
@@ -0,0 +1,27 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+<configuration>
+    <property>
+        <name>dfs.replication</name>
+        <value>1</value>
+    </property>
+</configuration>

Reply via email to