NobodyXu commented on code in PR #2192:
URL:
https://github.com/apache/incubator-opendal/pull/2192#discussion_r1187012394
##########
core/Cargo.toml:
##########
@@ -184,9 +179,8 @@ minitrace = { version = "0.4.0", optional = true }
moka = { version = "0.10", optional = true, features = ["future"] }
once_cell = "1"
openssh = { version = "0.9.9", optional = true }
-openssh-sftp-client = { version = "0.12.2", optional = true }
+openssh-sftp-client = { version = "0.13.3", optional = true, features =
["openssh"] }
Review Comment:
I recommend enabling tracing for better log in ci and production.
```suggestion
openssh-sftp-client = { version = "0.13.3", optional = true, features =
["openssh", "tracing"] }
```
##########
core/src/services/sftp/backend.rs:
##########
@@ -309,9 +281,9 @@ impl Accessor for SftpBackend {
}
async fn create_dir(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let paths: Vec<&str> = path.split_inclusive('/').collect();
Review Comment:
std::path::Path has an associated fn called components, which is more robust
than manually parsing path
##########
core/src/services/sftp/backend.rs:
##########
@@ -452,58 +428,97 @@ impl Accessor for SftpBackend {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let file_path = format!("./{}", path);
- let mut dir = match fs.open_dir(file_path.clone()).await {
+ let dir = match fs.open_dir(&file_path).await {
Ok(dir) => dir,
Err(e) => {
if is_not_found(&e) {
- return Ok((RpList::default(), SftpPager::empty()));
+ return Ok((RpList::default(), None));
} else {
return Err(e.into());
}
}
- };
- let dir = dir.read_dir().await?;
+ }
+ .read_dir();
Ok((
RpList::default(),
- SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ Some(SftpPager::new(dir, path.to_owned(), args.limit())),
))
}
}
impl SftpBackend {
- async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
- let pool = self
- .sftp
- .get_or_try_init(|| async {
- let manager = Manager {
- endpoint: self.endpoint.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- };
-
- bb8::Pool::builder().max_size(10).build(manager).await
+ async fn connect(&self) -> Result<&Sftp> {
+ let sftp = self
+ .client
+ .get_or_try_init(|| {
+ Box::pin(connect_sftp(
+ self.endpoint.clone(),
+ self.root.clone(),
+ self.user.clone(),
+ self.key.clone(),
+ self.known_hosts_strategy.clone(),
+ ))
})
.await?;
- Ok(pool)
+ Ok(sftp)
}
+}
- pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
- let conn = self.pool().await?.get().await?;
+async fn connect_sftp(
+ endpoint: String,
+ root: String,
+ user: String,
+ key: Option<String>,
+ known_hosts_strategy: KnownHosts,
+) -> Result<Sftp> {
+ let mut session = SessionBuilder::default();
- Ok(conn)
+ session.user(user.clone());
+
+ if let Some(key) = &key {
+ session.keyfile(key);
}
- pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static,
Manager>> {
- let conn = self.pool().await?.get_owned().await?;
+ // set control directory to avoid temp files in root directory when panic
+ session.control_directory("/tmp");
Review Comment:
I recommend using
[dirs::runtime_dir](https://docs.rs/dirs/latest/dirs/fn.runtime_dir.html) as
the control dir.
It returns a dir that is only accessible by current dir, reducing the risk
of accidentally accessed/modified by other users.
##########
core/src/services/sftp/backend.rs:
##########
@@ -452,58 +428,97 @@ impl Accessor for SftpBackend {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let file_path = format!("./{}", path);
- let mut dir = match fs.open_dir(file_path.clone()).await {
+ let dir = match fs.open_dir(&file_path).await {
Ok(dir) => dir,
Err(e) => {
if is_not_found(&e) {
- return Ok((RpList::default(), SftpPager::empty()));
+ return Ok((RpList::default(), None));
} else {
return Err(e.into());
}
}
- };
- let dir = dir.read_dir().await?;
+ }
+ .read_dir();
Ok((
RpList::default(),
- SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ Some(SftpPager::new(dir, path.to_owned(), args.limit())),
))
}
}
impl SftpBackend {
- async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
- let pool = self
- .sftp
- .get_or_try_init(|| async {
- let manager = Manager {
- endpoint: self.endpoint.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- };
-
- bb8::Pool::builder().max_size(10).build(manager).await
+ async fn connect(&self) -> Result<&Sftp> {
+ let sftp = self
+ .client
+ .get_or_try_init(|| {
+ Box::pin(connect_sftp(
+ self.endpoint.clone(),
+ self.root.clone(),
+ self.user.clone(),
+ self.key.clone(),
+ self.known_hosts_strategy.clone(),
+ ))
})
.await?;
- Ok(pool)
+ Ok(sftp)
}
+}
- pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
- let conn = self.pool().await?.get().await?;
+async fn connect_sftp(
+ endpoint: String,
+ root: String,
+ user: String,
+ key: Option<String>,
+ known_hosts_strategy: KnownHosts,
+) -> Result<Sftp> {
+ let mut session = SessionBuilder::default();
- Ok(conn)
+ session.user(user.clone());
+
+ if let Some(key) = &key {
+ session.keyfile(key);
}
- pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static,
Manager>> {
- let conn = self.pool().await?.get_owned().await?;
+ // set control directory to avoid temp files in root directory when panic
+ session.control_directory("/tmp");
+ session.server_alive_interval(Duration::from_secs(5));
+ session.known_hosts_check(known_hosts_strategy.clone());
+
+ let session = session.connect(&endpoint).await?;
+
+ let sftp = Sftp::from_session(session, SftpOptions::default()).await?;
+
+ let mut fs = sftp.fs();
+ fs.set_cwd("/");
- Ok(conn)
+ let paths: Vec<&str> = root.split_inclusive('/').skip(1).collect();
Review Comment:
std::path::Path has an associated fn called components, which is more robust
than manually parsing path
##########
core/src/services/sftp/backend.rs:
##########
@@ -452,58 +428,97 @@ impl Accessor for SftpBackend {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let file_path = format!("./{}", path);
- let mut dir = match fs.open_dir(file_path.clone()).await {
+ let dir = match fs.open_dir(&file_path).await {
Ok(dir) => dir,
Err(e) => {
if is_not_found(&e) {
- return Ok((RpList::default(), SftpPager::empty()));
+ return Ok((RpList::default(), None));
} else {
return Err(e.into());
}
}
- };
- let dir = dir.read_dir().await?;
+ }
+ .read_dir();
Ok((
RpList::default(),
- SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ Some(SftpPager::new(dir, path.to_owned(), args.limit())),
))
}
}
impl SftpBackend {
- async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
- let pool = self
- .sftp
- .get_or_try_init(|| async {
- let manager = Manager {
- endpoint: self.endpoint.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- };
-
- bb8::Pool::builder().max_size(10).build(manager).await
+ async fn connect(&self) -> Result<&Sftp> {
+ let sftp = self
+ .client
+ .get_or_try_init(|| {
+ Box::pin(connect_sftp(
+ self.endpoint.clone(),
+ self.root.clone(),
+ self.user.clone(),
+ self.key.clone(),
+ self.known_hosts_strategy.clone(),
+ ))
})
.await?;
- Ok(pool)
+ Ok(sftp)
}
+}
- pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
- let conn = self.pool().await?.get().await?;
+async fn connect_sftp(
+ endpoint: String,
+ root: String,
+ user: String,
+ key: Option<String>,
+ known_hosts_strategy: KnownHosts,
+) -> Result<Sftp> {
+ let mut session = SessionBuilder::default();
- Ok(conn)
+ session.user(user.clone());
+
+ if let Some(key) = &key {
+ session.keyfile(key);
}
- pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static,
Manager>> {
- let conn = self.pool().await?.get_owned().await?;
+ // set control directory to avoid temp files in root directory when panic
+ session.control_directory("/tmp");
+ session.server_alive_interval(Duration::from_secs(5));
+ session.known_hosts_check(known_hosts_strategy.clone());
Review Comment:
```suggestion
session.known_hosts_check(known_hosts_strategy);
```
##########
core/src/services/sftp/backend.rs:
##########
@@ -422,9 +397,10 @@ impl Accessor for SftpBackend {
}
}
.read_dir()
- .await?;
+ .boxed();
- for file in &dir {
+ while let Some(file) = dir.next().await {
+ let file = file?;
let file_name = file.filename().to_str().unwrap();
if file_name == "." || file_name == ".." {
Review Comment:
Using unwrap here is bad, you can use file.filename() == Some(Path::new("."))
##########
core/src/services/sftp/backend.rs:
##########
@@ -452,58 +428,97 @@ impl Accessor for SftpBackend {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let file_path = format!("./{}", path);
- let mut dir = match fs.open_dir(file_path.clone()).await {
+ let dir = match fs.open_dir(&file_path).await {
Ok(dir) => dir,
Err(e) => {
if is_not_found(&e) {
- return Ok((RpList::default(), SftpPager::empty()));
+ return Ok((RpList::default(), None));
} else {
return Err(e.into());
}
}
- };
- let dir = dir.read_dir().await?;
+ }
+ .read_dir();
Ok((
RpList::default(),
- SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ Some(SftpPager::new(dir, path.to_owned(), args.limit())),
))
}
}
impl SftpBackend {
- async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
- let pool = self
- .sftp
- .get_or_try_init(|| async {
- let manager = Manager {
- endpoint: self.endpoint.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- };
-
- bb8::Pool::builder().max_size(10).build(manager).await
+ async fn connect(&self) -> Result<&Sftp> {
+ let sftp = self
+ .client
+ .get_or_try_init(|| {
+ Box::pin(connect_sftp(
+ self.endpoint.clone(),
+ self.root.clone(),
+ self.user.clone(),
+ self.key.clone(),
+ self.known_hosts_strategy.clone(),
+ ))
})
.await?;
- Ok(pool)
+ Ok(sftp)
}
+}
- pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
- let conn = self.pool().await?.get().await?;
+async fn connect_sftp(
+ endpoint: String,
+ root: String,
+ user: String,
+ key: Option<String>,
+ known_hosts_strategy: KnownHosts,
+) -> Result<Sftp> {
+ let mut session = SessionBuilder::default();
- Ok(conn)
+ session.user(user.clone());
Review Comment:
```suggestion
session.user(user);
```
##########
core/src/services/sftp/backend.rs:
##########
@@ -452,58 +428,97 @@ impl Accessor for SftpBackend {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let file_path = format!("./{}", path);
- let mut dir = match fs.open_dir(file_path.clone()).await {
+ let dir = match fs.open_dir(&file_path).await {
Ok(dir) => dir,
Err(e) => {
if is_not_found(&e) {
- return Ok((RpList::default(), SftpPager::empty()));
+ return Ok((RpList::default(), None));
} else {
return Err(e.into());
}
}
- };
- let dir = dir.read_dir().await?;
+ }
+ .read_dir();
Ok((
RpList::default(),
- SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ Some(SftpPager::new(dir, path.to_owned(), args.limit())),
))
}
}
impl SftpBackend {
- async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
- let pool = self
- .sftp
- .get_or_try_init(|| async {
- let manager = Manager {
- endpoint: self.endpoint.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- };
-
- bb8::Pool::builder().max_size(10).build(manager).await
+ async fn connect(&self) -> Result<&Sftp> {
+ let sftp = self
+ .client
+ .get_or_try_init(|| {
+ Box::pin(connect_sftp(
+ self.endpoint.clone(),
+ self.root.clone(),
+ self.user.clone(),
+ self.key.clone(),
+ self.known_hosts_strategy.clone(),
+ ))
})
.await?;
- Ok(pool)
+ Ok(sftp)
}
+}
- pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
- let conn = self.pool().await?.get().await?;
+async fn connect_sftp(
+ endpoint: String,
Review Comment:
Passing a ref for endpoing is enough.
In fact, I think it will be better to pass everything by reference and only
clone when necessary.
```suggestion
endpoint: &str,
```
##########
core/src/services/sftp/backend.rs:
##########
@@ -452,58 +428,97 @@ impl Accessor for SftpBackend {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let file_path = format!("./{}", path);
- let mut dir = match fs.open_dir(file_path.clone()).await {
+ let dir = match fs.open_dir(&file_path).await {
Ok(dir) => dir,
Err(e) => {
if is_not_found(&e) {
- return Ok((RpList::default(), SftpPager::empty()));
+ return Ok((RpList::default(), None));
} else {
return Err(e.into());
}
}
- };
- let dir = dir.read_dir().await?;
+ }
+ .read_dir();
Ok((
RpList::default(),
- SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ Some(SftpPager::new(dir, path.to_owned(), args.limit())),
))
}
}
impl SftpBackend {
- async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
- let pool = self
- .sftp
- .get_or_try_init(|| async {
- let manager = Manager {
- endpoint: self.endpoint.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- };
-
- bb8::Pool::builder().max_size(10).build(manager).await
+ async fn connect(&self) -> Result<&Sftp> {
+ let sftp = self
+ .client
+ .get_or_try_init(|| {
+ Box::pin(connect_sftp(
+ self.endpoint.clone(),
+ self.root.clone(),
+ self.user.clone(),
+ self.key.clone(),
+ self.known_hosts_strategy.clone(),
+ ))
})
.await?;
- Ok(pool)
+ Ok(sftp)
}
+}
- pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
- let conn = self.pool().await?.get().await?;
+async fn connect_sftp(
+ endpoint: String,
+ root: String,
+ user: String,
+ key: Option<String>,
+ known_hosts_strategy: KnownHosts,
+) -> Result<Sftp> {
+ let mut session = SessionBuilder::default();
- Ok(conn)
+ session.user(user.clone());
+
+ if let Some(key) = &key {
+ session.keyfile(key);
}
- pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static,
Manager>> {
- let conn = self.pool().await?.get_owned().await?;
+ // set control directory to avoid temp files in root directory when panic
+ session.control_directory("/tmp");
Review Comment:
Also, on MacOS, /tmp is just a symlink to /private/tmp, so using runtime_dir
makes even more sense here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]