This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 2a281ae9b fix(integrations/unftp-sbe): properly shutdown after write
(#6601)
2a281ae9b is described below
commit 2a281ae9bebe1c6bc600eb3cb1ec454c2f1a9c5c
Author: Valentin Iovene <[email protected]>
AuthorDate: Tue Sep 30 15:53:46 2025 +0200
fix(integrations/unftp-sbe): properly shutdown after write (#6601)
* fix(integrations/unftp-sbe): properly shutdown after write
The previous implem was not working: files were not being written to the
object storage, and writing was faily silently, with a warning from
opendal. The compat writer should be shutdown properly after write, as
detailed in https://github.com/apache/opendal/issues/4926.
* rustfmt
* better error handling
* fix error handling
* smallfix
* better error handling
---
integrations/unftp-sbe/src/lib.rs | 25 ++++++++++++++++++++++---
1 file changed, 22 insertions(+), 3 deletions(-)
diff --git a/integrations/unftp-sbe/src/lib.rs
b/integrations/unftp-sbe/src/lib.rs
index 415c5bc5e..d48733523 100644
--- a/integrations/unftp-sbe/src/lib.rs
+++ b/integrations/unftp-sbe/src/lib.rs
@@ -62,9 +62,10 @@ use std::fmt::Debug;
use std::path::{Path, PathBuf};
use libunftp::auth::UserDetail;
-use libunftp::storage::{self, StorageBackend};
+use libunftp::storage::{self, Error, StorageBackend};
use opendal::Operator;
+use tokio::io::AsyncWriteExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt,
FuturesAsyncWriteCompatExt};
#[derive(Debug, Clone)]
@@ -210,8 +211,26 @@ impl<User: UserDetail> StorageBackend<User> for
OpendalStorage {
.map_err(convert_err)?
.into_futures_async_write()
.compat_write();
- let len = tokio::io::copy(&mut input, &mut w).await?;
- Ok(len)
+ let copy_result = tokio::io::copy(&mut input, &mut w).await;
+ let shutdown_result = w.shutdown().await;
+ match (copy_result, shutdown_result) {
+ (Ok(len), Ok(())) => Ok(len),
+ (Err(copy_err), Ok(())) => Err(Error::new(
+ storage::ErrorKind::LocalError,
+ format!("Failed to copy data: {}", copy_err),
+ )),
+ (Ok(_), Err(shutdown_err)) => Err(Error::new(
+ storage::ErrorKind::LocalError,
+ format!("Failed to shutdown writer: {}", shutdown_err),
+ )),
+ (Err(copy_err), Err(shutdown_err)) => Err(Error::new(
+ storage::ErrorKind::LocalError,
+ format!(
+ "Failed to copy data: {} AND failed to shutdown writer:
{}",
+ copy_err, shutdown_err
+ ),
+ )),
+ }
}
async fn del<P: AsRef<Path> + Send + Debug>(&self, _: &User, path: P) ->
storage::Result<()> {