This is an automated email from the ASF dual-hosted git repository.

hoslo pushed a commit to branch fix-close
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 0c1c36bd5d9803a8d73a75566abadc7678a4171c
Author: Xuanwo <[email protected]>
AuthorDate: Sun Feb 4 10:12:49 2024 +0800

    fix(services/fs,hdfs): fix poll_close when retry
---
 core/fuzz/Cargo.toml             |  4 ++--
 core/src/services/fs/writer.rs   | 51 +++++++++++++++++++++++++++++-----------
 core/src/services/hdfs/writer.rs | 22 ++++++++++++-----
 3 files changed, 55 insertions(+), 22 deletions(-)

diff --git a/core/fuzz/Cargo.toml b/core/fuzz/Cargo.toml
index 2e8a9fb050..75713ab922 100644
--- a/core/fuzz/Cargo.toml
+++ b/core/fuzz/Cargo.toml
@@ -17,12 +17,12 @@
 
 [package]
 edition = "2021"
+license = "Apache-2.0"
 name = "opendal-fuzz"
 publish = false
+rust-version = "1.67"
 version = "0.0.0"
 
-license.workspace = true
-
 [package.metadata]
 cargo-fuzz = true
 
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 12e5a4fffe..a8ebb4fd3c 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -35,7 +35,7 @@ pub struct FsWriter<F> {
     tmp_path: Option<PathBuf>,
 
     f: Option<F>,
-    fut: Option<BoxFuture<'static, Result<()>>>,
+    fut: Option<BoxFuture<'static, (F, Result<()>)>>,
 }
 
 impl<F> FsWriter<F> {
@@ -69,23 +69,35 @@ impl oio::Write for FsWriter<tokio::fs::File> {
             if let Some(fut) = self.fut.as_mut() {
                 let res = ready!(fut.poll_unpin(cx));
                 self.fut = None;
-                return Poll::Ready(res);
+                if let Err(e) = res.1 {
+                    self.f = Some(res.0);
+                    return Poll::Ready(Err(e));
+                }
+                return Poll::Ready(Ok(()));
             }
 
             let mut f = self.f.take().expect("FsWriter must be initialized");
             let tmp_path = self.tmp_path.clone();
             let target_path = self.target_path.clone();
             self.fut = Some(Box::pin(async move {
-                f.flush().await.map_err(new_std_io_error)?;
-                f.sync_all().await.map_err(new_std_io_error)?;
+                if let Err(e) = f.flush().await.map_err(new_std_io_error) {
+                    // Reserve the original error for retry.
+                    return (f, Err(e));
+                }
+                if let Err(e) = f.sync_all().await.map_err(new_std_io_error) {
+                    return (f, Err(e));
+                }
 
                 if let Some(tmp_path) = &tmp_path {
-                    tokio::fs::rename(tmp_path, &target_path)
+                    if let Err(e) = tokio::fs::rename(tmp_path, &target_path)
                         .await
-                        .map_err(new_std_io_error)?;
+                        .map_err(new_std_io_error)
+                    {
+                        return (f, Err(e));
+                    }
                 }
 
-                Ok(())
+                (f, Ok(()))
             }));
         }
     }
@@ -95,21 +107,32 @@ impl oio::Write for FsWriter<tokio::fs::File> {
             if let Some(fut) = self.fut.as_mut() {
                 let res = ready!(fut.poll_unpin(cx));
                 self.fut = None;
-                return Poll::Ready(res);
+                if let Err(e) = res.1 {
+                    self.f = Some(res.0);
+                    return Poll::Ready(Err(e));
+                }
+                return Poll::Ready(Ok(()));
             }
 
-            let _ = self.f.take().expect("FsWriter must be initialized");
+            let f = self.f.take().expect("FsWriter must be initialized");
             let tmp_path = self.tmp_path.clone();
             self.fut = Some(Box::pin(async move {
                 if let Some(tmp_path) = &tmp_path {
-                    tokio::fs::remove_file(tmp_path)
+                    if let Err(e) = tokio::fs::remove_file(tmp_path)
                         .await
                         .map_err(new_std_io_error)
+                    {
+                        return (f, Err(e));
+                    }
+                    (f, Ok(()))
                 } else {
-                    Err(Error::new(
-                        ErrorKind::Unsupported,
-                        "Fs doesn't support abort if atomic_write_dir is not 
set",
-                    ))
+                    (
+                        f,
+                        Err(Error::new(
+                            ErrorKind::Unsupported,
+                            "Fs doesn't support abort if atomic_write_dir is 
not set",
+                        )),
+                    )
                 }
             }));
         }
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 6c77097d84..535e97c38b 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -36,7 +36,7 @@ pub struct HdfsWriter<F> {
     tmp_path: Option<String>,
     f: Option<F>,
     client: Arc<hdrs::Client>,
-    fut: Option<BoxFuture<'static, Result<()>>>,
+    fut: Option<BoxFuture<'static, (F, Result<()>)>>,
 }
 
 /// # Safety
@@ -76,7 +76,11 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
             if let Some(fut) = self.fut.as_mut() {
                 let res = ready!(fut.poll_unpin(cx));
                 self.fut = None;
-                return Poll::Ready(res);
+                if let Err(e) = res.1 {
+                    self.f = Some(res.0);
+                    return Poll::Ready(Err(e));
+                }
+                return Poll::Ready(Ok(()));
             }
 
             let mut f = self.f.take().expect("HdfsWriter must be initialized");
@@ -86,15 +90,21 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
             let client = self.client.clone();
 
             self.fut = Some(Box::pin(async move {
-                f.close().await.map_err(new_std_io_error)?;
+                if let Err(e) = f.close().await.map_err(new_std_io_error) {
+                    // Reserve the original error for retry.
+                    return (f, Err(e));
+                }
 
                 if let Some(tmp_path) = tmp_path {
-                    client
+                    if let Err(e) = client
                         .rename_file(&tmp_path, &target_path)
-                        .map_err(new_std_io_error)?;
+                        .map_err(new_std_io_error)
+                    {
+                        return (f, Err(e));
+                    }
                 }
 
-                Ok(())
+                (f, Ok(()))
             }));
         }
     }

Reply via email to