NobodyXu commented on code in PR #2192:
URL:
https://github.com/apache/incubator-opendal/pull/2192#discussion_r1185726965
##########
core/src/services/sftp/pager.rs:
##########
@@ -15,77 +15,81 @@
// specific language governing permissions and limitations
// under the License.
+use std::pin::Pin;
+
use async_trait::async_trait;
+use futures::StreamExt;
use openssh_sftp_client::fs::DirEntry;
+use openssh_sftp_client::fs::ReadDir;
use crate::raw::oio;
use crate::Result;
-pub struct SftpPager {
- dir: Box<[DirEntry]>,
- path: String,
- limit: Option<usize>,
- complete: bool,
+use super::backend::Connection;
+
+pub enum SftpPager {
+ Empty,
+ Inner(SftpPagerInner),
+}
+
+pub struct SftpPagerInner {
+ dir: Pin<Box<ReadDir>>,
+ prefix: String,
+ limit: usize,
+ _conn: Connection,
}
impl SftpPager {
- pub fn new(dir: Box<[DirEntry]>, path: String, limit: Option<usize>) ->
Self {
- Self {
- dir,
- path,
+ pub fn new(conn: Connection, dir: ReadDir, path: String, limit:
Option<usize>) -> Self {
+ let prefix = if path == "/" { "".to_owned() } else { path };
+
+ let limit = limit.unwrap_or(usize::MAX);
+
+ Self::Inner(SftpPagerInner {
+ dir: Box::pin(dir),
+ prefix,
limit,
- complete: false,
- }
+ _conn: conn,
+ })
}
pub fn empty() -> Self {
- Self {
- dir: Box::new([]),
- path: String::new(),
- limit: None,
- complete: true,
- }
+ Self::Empty
}
}
#[async_trait]
impl oio::Page for SftpPager {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
- if self.complete {
- return Ok(None);
- }
-
- // when listing the root directory, the prefix should be empty
- if self.path == "/" {
- self.path = "".to_owned();
- }
+ match self {
+ Self::Empty => Ok(None),
+ Self::Inner(inner) => {
+ if inner.limit == 0 {
+ return Ok(None);
+ }
- let iter = self
- .dir
- .iter()
- .filter(|e| {
- // filter out "." and ".."
- e.filename().to_str().unwrap() != "." &&
e.filename().to_str().unwrap() != ".."
- })
- .map(|e| map_entry(self.path.clone(), e.clone()));
+ let item = inner.dir.next().await;
- let v: Vec<oio::Entry> = if let Some(limit) = self.limit {
- iter.take(limit).collect()
- } else {
- iter.collect()
- };
-
- self.complete = true;
-
- if v.is_empty() {
- Ok(None)
- } else {
- Ok(Some(v))
+ match item {
+ Some(Ok(e)) => {
+ if e.filename().to_str().unwrap() == "."
Review Comment:
```suggestion
if e.filename().to_str() == Some(".")
```
##########
core/src/services/sftp/pager.rs:
##########
@@ -15,77 +15,81 @@
// specific language governing permissions and limitations
// under the License.
+use std::pin::Pin;
+
use async_trait::async_trait;
+use futures::StreamExt;
use openssh_sftp_client::fs::DirEntry;
+use openssh_sftp_client::fs::ReadDir;
use crate::raw::oio;
use crate::Result;
-pub struct SftpPager {
- dir: Box<[DirEntry]>,
- path: String,
- limit: Option<usize>,
- complete: bool,
+use super::backend::Connection;
+
+pub enum SftpPager {
+ Empty,
+ Inner(SftpPagerInner),
+}
+
+pub struct SftpPagerInner {
+ dir: Pin<Box<ReadDir>>,
+ prefix: String,
+ limit: usize,
+ _conn: Connection,
}
impl SftpPager {
- pub fn new(dir: Box<[DirEntry]>, path: String, limit: Option<usize>) ->
Self {
- Self {
- dir,
- path,
+ pub fn new(conn: Connection, dir: ReadDir, path: String, limit:
Option<usize>) -> Self {
+ let prefix = if path == "/" { "".to_owned() } else { path };
+
+ let limit = limit.unwrap_or(usize::MAX);
+
+ Self::Inner(SftpPagerInner {
+ dir: Box::pin(dir),
+ prefix,
limit,
- complete: false,
- }
+ _conn: conn,
+ })
}
pub fn empty() -> Self {
- Self {
- dir: Box::new([]),
- path: String::new(),
- limit: None,
- complete: true,
- }
+ Self::Empty
}
}
#[async_trait]
impl oio::Page for SftpPager {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
- if self.complete {
- return Ok(None);
- }
-
- // when listing the root directory, the prefix should be empty
- if self.path == "/" {
- self.path = "".to_owned();
- }
+ match self {
+ Self::Empty => Ok(None),
+ Self::Inner(inner) => {
+ if inner.limit == 0 {
+ return Ok(None);
+ }
- let iter = self
- .dir
- .iter()
- .filter(|e| {
- // filter out "." and ".."
- e.filename().to_str().unwrap() != "." &&
e.filename().to_str().unwrap() != ".."
- })
- .map(|e| map_entry(self.path.clone(), e.clone()));
+ let item = inner.dir.next().await;
- let v: Vec<oio::Entry> = if let Some(limit) = self.limit {
- iter.take(limit).collect()
- } else {
- iter.collect()
- };
-
- self.complete = true;
-
- if v.is_empty() {
- Ok(None)
- } else {
- Ok(Some(v))
+ match item {
+ Some(Ok(e)) => {
+ if e.filename().to_str().unwrap() == "."
+ || e.filename().to_str().unwrap() == ".."
Review Comment:
```suggestion
|| e.filename().to_str() == Some("..")
```
##########
core/src/services/sftp/utils.rs:
##########
@@ -16,65 +16,70 @@
// under the License.
use std::io::SeekFrom;
-use std::path::PathBuf;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use async_compat::Compat;
-use bb8::PooledConnection;
-use futures::executor::block_on;
+use futures::AsyncRead;
+use futures::AsyncSeek;
+use openssh_sftp_client::file::File;
use openssh_sftp_client::file::TokioCompatFile;
use openssh_sftp_client::metadata::MetaData as SftpMeta;
-use owning_ref::OwningHandle;
-use super::backend::Manager;
+use super::backend::Connection;
use crate::raw::oio;
use crate::raw::oio::into_reader::FdReader;
use crate::raw::oio::ReadExt;
use crate::EntryMode;
use crate::Metadata;
use crate::Result;
-pub struct SftpReader {
- // similar situation to connection struct
- // We can make sure the file can live as long as the connection.
- file: OwningHandle<
- Box<PooledConnection<'static, Manager>>,
- Box<FdReader<Compat<TokioCompatFile<'static>>>>,
- >,
+pub struct SftpReaderInner {
+ file: Pin<Box<Compat<TokioCompatFile>>>,
+ _conn: Connection,
}
+pub type SftpReader = FdReader<SftpReaderInner>;
-impl SftpReader {
- pub async fn new(
- conn: PooledConnection<'static, Manager>,
- path: PathBuf,
- start: u64,
- end: u64,
- ) -> Result<Self> {
- let mut file = OwningHandle::new_with_fn(Box::new(conn), |conn| unsafe
{
- let file = block_on((*conn).sftp.open(path)).unwrap();
- let f = Compat::new(TokioCompatFile::from(file));
- Box::new(oio::into_reader::from_fd(f, start, end))
- });
-
- file.seek(SeekFrom::Start(0)).await?;
-
- Ok(SftpReader { file })
+impl SftpReaderInner {
+ pub async fn new(conn: Connection, file: File) -> Self {
+ let file = Compat::new(file.into());
+ Self {
+ file: Box::pin(file),
+ _conn: conn,
+ }
}
}
-impl oio::Read for SftpReader {
- fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) ->
Poll<Result<usize>> {
- Pin::new(&mut *self.file).poll_read(cx, buf)
+impl SftpReader {
+ /// Create a new reader from a file, starting at the given offset and
ending at the given offset.
+ pub async fn new(conn: Connection, file: File, start: u64, end: u64) ->
Result<Self> {
+ let file = SftpReaderInner::new(conn, file).await;
+ let mut r = oio::into_reader::from_fd(file, start, end);
+ r.seek(SeekFrom::Start(0)).await?;
+ Ok(r)
}
+}
- fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) ->
Poll<Result<u64>> {
- Pin::new(&mut *self.file).poll_seek(cx, pos)
+impl AsyncRead for SftpReaderInner {
Review Comment:
P.S. TokioCompatFile also implements AsyncBufRead, which could be exposed
here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]