This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new b906d8a524 fix(unftp-sbe): remove buffer for get (#4775)
b906d8a524 is described below
commit b906d8a5245ac172c3436e7a490c0898eb76f58b
Author: Pop <[email protected]>
AuthorDate: Thu Jun 20 15:25:41 2024 +0900
fix(unftp-sbe): remove buffer for get (#4775)
* remove buffer for get
* remove unnecessary deps
---
integrations/unftp-sbe/Cargo.lock | 1 -
integrations/unftp-sbe/Cargo.toml | 1 -
integrations/unftp-sbe/src/lib.rs | 35 ++++++++++-------------------------
3 files changed, 10 insertions(+), 27 deletions(-)
diff --git a/integrations/unftp-sbe/Cargo.lock
b/integrations/unftp-sbe/Cargo.lock
index 0d4613c253..5fdbc6cbed 100644
--- a/integrations/unftp-sbe/Cargo.lock
+++ b/integrations/unftp-sbe/Cargo.lock
@@ -1927,7 +1927,6 @@ name = "unftp-sbe-opendal"
version = "0.0.0"
dependencies = [
"async-trait",
- "bytes",
"libunftp",
"opendal",
"tokio",
diff --git a/integrations/unftp-sbe/Cargo.toml
b/integrations/unftp-sbe/Cargo.toml
index 2c696861f1..06dcf30cfc 100644
--- a/integrations/unftp-sbe/Cargo.toml
+++ b/integrations/unftp-sbe/Cargo.toml
@@ -28,7 +28,6 @@ version = "0.0.0"
[dependencies]
async-trait = "0.1.80"
-bytes = "1.6.0"
libunftp = "0.20.0"
opendal = { version = "0.47.0", path = "../../core" }
tokio = { version = "1.38.0", default-features = false, features = ["io-util"]
}
diff --git a/integrations/unftp-sbe/src/lib.rs
b/integrations/unftp-sbe/src/lib.rs
index aabaad8fe8..0f9658c119 100644
--- a/integrations/unftp-sbe/src/lib.rs
+++ b/integrations/unftp-sbe/src/lib.rs
@@ -17,15 +17,12 @@
use std::fmt::Debug;
use std::path::{Path, PathBuf};
-use std::task::Poll;
-use bytes::{Buf, BufMut};
use libunftp::auth::UserDetail;
use libunftp::storage::{self, StorageBackend};
-use opendal::{Buffer, Operator};
+use opendal::Operator;
-use tokio::io::AsyncRead;
-use tokio_util::compat::FuturesAsyncWriteCompatExt;
+use tokio_util::compat::{FuturesAsyncReadCompatExt,
FuturesAsyncWriteCompatExt};
#[derive(Debug, Clone)]
pub struct OpendalStorage {
@@ -38,21 +35,6 @@ impl OpendalStorage {
}
}
-/// A wrapper around [`Buffer`] to implement [`tokio::io::AsyncRead`].
-pub struct IoBuffer(Buffer);
-
-impl AsyncRead for IoBuffer {
- fn poll_read(
- mut self: std::pin::Pin<&mut Self>,
- _: &mut std::task::Context<'_>,
- buf: &mut tokio::io::ReadBuf<'_>,
- ) -> Poll<std::io::Result<()>> {
- let len = std::io::copy(&mut self.as_mut().0.by_ref().reader(), &mut
buf.writer())?;
- self.0.advance(len as usize);
- Poll::Ready(Ok(()))
- }
-}
-
/// A wrapper around [`opendal::Metadata`] to implement
[`libunftp::storage::Metadata`].
pub struct OpendalMetadata(opendal::Metadata);
@@ -155,13 +137,16 @@ impl<User: UserDetail> StorageBackend<User> for
OpendalStorage {
path: P,
start_pos: u64,
) -> storage::Result<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>> {
- let buf = self
+ let reader = self
.op
- .read_with(convert_path(path.as_ref())?)
- .range(start_pos..)
+ .reader(convert_path(path.as_ref())?)
.await
- .map_err(convert_err)?;
- Ok(Box::new(IoBuffer(buf)))
+ .map_err(convert_err)?
+ .into_futures_async_read(start_pos..)
+ .await
+ .map_err(convert_err)?
+ .compat();
+ Ok(Box::new(reader))
}
async fn put<