NobodyXu commented on code in PR #2192:
URL: 
https://github.com/apache/incubator-opendal/pull/2192#discussion_r1185922304


##########
core/src/services/sftp/backend.rs:
##########
@@ -465,43 +463,112 @@ impl Accessor for SftpBackend {
                     return Err(e.into());
                 }
             }
-        };
-        let dir = dir.read_dir().await?;
+        }
+        .read_dir();
 
         Ok((
             RpList::default(),
-            SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+            SftpPager::new(client, dir, path.to_owned(), args.limit()),
         ))
     }
 }
 
 impl SftpBackend {
-    async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
-        let pool = self
-            .sftp
-            .get_or_try_init(|| async {
-                let manager = Manager {
-                    endpoint: self.endpoint.clone(),
-                    user: self.user.clone(),
-                    key: self.key.clone(),
-                };
-
-                bb8::Pool::builder().max_size(10).build(manager).await
-            })
-            .await?;
-
-        Ok(pool)
-    }
+    async fn connect(&self) -> std::result::Result<Connection, Error> {
+        let mut session = SessionBuilder::default();
+
+        session.user(self.user.clone());
+
+        if let Some(key) = &self.key {
+            session.keyfile(key);
+        }
+
+        // set control directory to avoid temp files in root directory when 
panic
+        session.control_directory("/tmp");
+        session.server_alive_interval(Duration::from_secs(5));
+        session.known_hosts_check(self.known_hosts_strategy.clone());
+
+        // when connection > 10, it will wait others to finish
+        let permit = self.cnt.clone().acquire_owned().await.map_err(|_| {
+            Error::new(ErrorKind::Unexpected, "failed to acquire connection 
permit")
+        })?;
+
+        let session = session.connect(&self.endpoint).await?;
+
+        let (tx, rx) = oneshot::channel();
+
+        let mut future = Box::pin(async move {
+            let (_child, stdin, stdout) = spawn_child(&session).await?;
+            tx.send((stdin, stdout)).ok();
+            // Wait forever until being dropped
+            std::future::pending::<Result<()>>().await
+        });
+
+        let (stdin, stdout) = tokio::select! {
+            // Always poll future first to simplify branches
+            biased;
 
-    pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
-        let conn = self.pool().await?.get().await?;
+            // future would only return on error
+            res = future.as_mut() => return Err(res.unwrap_err().into()),
+            res = rx => res.map_err(|_| Error::new(ErrorKind::Unexpected, 
"failed to receive stdin/stdout"))?,
+        };
 
-        Ok(conn)
+        let sftp = Sftp::new_with_auxiliary(
+            stdin,
+            stdout,
+            Default::default(),
+            SftpAuxiliaryData::Boxed(Box::new(future)),

Review Comment:
   @silver-ymz I've added a new variant `SftpAuxiliaryData::PinnedFuture` for 
this.
   
   P.S. docs.rs is currently overloaded so it won't show up there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to