NobodyXu commented on code in PR #2192:
URL:
https://github.com/apache/incubator-opendal/pull/2192#discussion_r1185758295
##########
core/src/services/sftp/backend.rs:
##########
@@ -465,43 +466,94 @@ impl Accessor for SftpBackend {
return Err(e.into());
}
}
- };
- let dir = dir.read_dir().await?;
+ }
+ .read_dir();
Ok((
RpList::default(),
- SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ SftpPager::new(client, dir, path.to_owned(), args.limit()),
))
}
}
impl SftpBackend {
- async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
- let pool = self
- .sftp
- .get_or_try_init(|| async {
- let manager = Manager {
- endpoint: self.endpoint.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- };
-
- bb8::Pool::builder().max_size(10).build(manager).await
- })
- .await?;
-
- Ok(pool)
- }
+ async fn connect(&self) -> std::result::Result<Connection, Error> {
Review Comment:
Creating a new connection for each request really isn't efficient.
I think it's much better to cache the `Connection` in a
`tokio::sync::OnceCell` and reuse it.
##########
core/src/services/sftp/backend.rs:
##########
@@ -465,43 +466,94 @@ impl Accessor for SftpBackend {
return Err(e.into());
}
}
- };
- let dir = dir.read_dir().await?;
+ }
+ .read_dir();
Ok((
RpList::default(),
- SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ SftpPager::new(client, dir, path.to_owned(), args.limit()),
))
}
}
impl SftpBackend {
- async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
- let pool = self
- .sftp
- .get_or_try_init(|| async {
- let manager = Manager {
- endpoint: self.endpoint.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- };
-
- bb8::Pool::builder().max_size(10).build(manager).await
- })
- .await?;
-
- Ok(pool)
- }
+ async fn connect(&self) -> std::result::Result<Connection, Error> {
+ let mut session = SessionBuilder::default();
- pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
- let conn = self.pool().await?.get().await?;
+ session.user(self.user.clone());
- Ok(conn)
- }
+ if let Some(key) = &self.key {
+ session.keyfile(key);
+ }
+
+ // set control directory to avoid temp files in root directory when
panic
+ session.control_directory("/tmp");
+ session.server_alive_interval(Duration::from_secs(5));
+ session.known_hosts_check(self.known_hosts_strategy.clone());
- pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static,
Manager>> {
- let conn = self.pool().await?.get_owned().await?;
+ // when connection > 10, it will wait others to finish
+ let permit = self.cnt.clone().acquire_owned().await.map_err(|_| {
+ Error::new(ErrorKind::Unexpected, "failed to acquire connection
permit")
+ })?;
+
+ let session = session.connect(&self.endpoint).await?;
+
+ let sess = Box::new(session);
+ let mut oref = OwningHandle::new_with_fn(sess, unsafe {
+ |x| {
+ Box::new(
+ block_on(
+ (*x).subsystem("sftp")
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .spawn(),
+ )
+ .unwrap(),
+ )
+ }
+ });
+
+ let sftp = Sftp::new(
+ oref.stdin().take().unwrap(),
+ oref.stdout().take().unwrap(),
+ Default::default(),
+ )
+ .await?;
Review Comment:
Here is a more simplified version based on
https://github.com/openssh-rust/openssh-sftp-client/pull/65#issuecomment-1535137285
:
```suggestion
let stdio = std::sync::Arc::new(once_cell::OnceCell::new());
let stdio_cloned = std::sync::Arc::clone(&stdio);
let mut future = Box::pin(async move {
let mut child = session.subsystem("sftp")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
.await?;
let stdin = child.stdin().take().unwrap();
let stdout = child.stdout().take().unwrap();
stdio_cloned.set((stdin, stdout)).unwrap();
drop(stdio_cloned);
// Wait forever until being dropped
std::future::pending().await;
// Use child, session after await to keep them alive
drop(child);
drop(session);
Ok(())
});
let (stdin, stdout) = std::future::poll_fn(|cx| {
loop {
if let Poll::Ready(res) = future.as_mut().poll(cx) {
// future only returns on Err
break Err(res.unwrap_err());
}
if Arc::strong_count(stdio) == 1 {
let value = Arc::try_unwrap(stdio).unwrap().take();
if let Some(stdio) = value {
break Ok(stdio);
} else if let Poll::Ready(res) =
future.as_mut().poll(cx) {
// future only returns on Err
break Err(res.unwrap_err());
} else {
unreachable!("future drops stdio yet it does not
produce Ok or Err")
}
}
}
}).await?;
let sftp = Sftp::new_with_auxiliary(
stdin,
stdout,
Default::default(),
openssh_sftp_client::SftpAuxiliaryData::Boxed(Pin::into_inner(future)),
)
.await?;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]