NobodyXu commented on code in PR #2192:
URL:
https://github.com/apache/incubator-opendal/pull/2192#discussion_r1186052502
##########
core/src/services/sftp/backend.rs:
##########
@@ -468,107 +446,128 @@ impl Accessor for SftpBackend {
Ok((
RpList::default(),
- SftpPager::new(client, dir, path.to_owned(), args.limit()),
+ SftpPager::new(dir, path.to_owned(), args.limit()),
))
}
}
impl SftpBackend {
- async fn connect(&self) -> std::result::Result<Connection, Error> {
- let mut session = SessionBuilder::default();
-
- session.user(self.user.clone());
-
- if let Some(key) = &self.key {
- session.keyfile(key);
- }
-
- // set control directory to avoid temp files in root directory when
panic
- session.control_directory("/tmp");
- session.server_alive_interval(Duration::from_secs(5));
- session.known_hosts_check(self.known_hosts_strategy.clone());
-
- // when connection > 10, it will wait others to finish
- let permit = self.cnt.clone().acquire_owned().await.map_err(|_| {
- Error::new(ErrorKind::Unexpected, "failed to acquire connection
permit")
- })?;
-
- let session = session.connect(&self.endpoint).await?;
+ async fn connect(&self) -> Result<&Sftp> {
+ let sftp = self
+ .client
+ .get_or_try_init(|| {
+ connect_sftp(
+ self.endpoint.clone(),
+ self.root.clone(),
+ self.user.clone(),
+ self.key.clone(),
+ self.known_hosts_strategy.clone(),
+ )
+ })
+ .await?;
+
+ Ok(sftp)
+ }
+}
- let (tx, rx) = oneshot::channel();
+async fn connect_sftp(
+ endpoint: String,
+ root: String,
+ user: String,
+ key: Option<String>,
+ known_hosts_strategy: KnownHosts,
+) -> Result<Sftp> {
+ let mut session = SessionBuilder::default();
- let mut future = Box::pin(async move {
- let (_child, stdin, stdout) = spawn_child(&session).await?;
- tx.send((stdin, stdout)).ok();
- // Wait forever until being dropped
- std::future::pending::<Result<()>>().await
- });
+ session.user(user.clone());
- let (stdin, stdout) = tokio::select! {
- // Always poll future first to simplify branches
- biased;
+ if let Some(key) = &key {
+ session.keyfile(key);
+ }
- // future would only return on error
- res = future.as_mut() => return Err(res.unwrap_err().into()),
- res = rx => res.map_err(|_| Error::new(ErrorKind::Unexpected,
"failed to receive stdin/stdout"))?,
+ // set control directory to avoid temp files in root directory when panic
+ session.control_directory("/tmp");
+ session.server_alive_interval(Duration::from_secs(5));
+ session.known_hosts_check(known_hosts_strategy.clone());
+
+ let session = session.connect(&endpoint).await?;
+
+ let mut stdio = Arc::new(once_cell::sync::OnceCell::new());
+ let stdio_cloned = Arc::clone(&stdio);
+ let mut future = Box::pin(async move {
+ let res = session
+ .subsystem("sftp")
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::null())
+ .spawn()
+ .await;
+
+ let mut child = match res {
+ Ok(child) => child,
+ Err(err) => {
+ stdio_cloned.set(Err(err)).unwrap(); // Err
+ drop(stdio_cloned);
+ return;
+ }
};
- let sftp = Sftp::new_with_auxiliary(
- stdin,
- stdout,
- Default::default(),
- SftpAuxiliaryData::Boxed(Box::new(future)),
- )
- .await?;
+ let stdin = child.stdin().take().unwrap();
+ let stdout = child.stdout().take().unwrap();
+ stdio_cloned.set(Ok((stdin, stdout))).unwrap();
+ drop(stdio_cloned);
- let mut fs = sftp.fs();
- fs.set_cwd("/");
+ // Wait forever until being dropped
+ std::future::pending::<()>().await;
- let paths: Vec<&str> =
self.root.split_inclusive('/').skip(1).collect();
- let mut current = "/".to_owned();
- for p in paths {
- if p.is_empty() {
- continue;
- }
+ debug!("sftp child process exited");
Review Comment:
This code would never get executed.
To print a msg on drop, I recommend using `scopeguard`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]