Xuanwo commented on code in PR #51:
URL: https://github.com/apache/paimon-rust/pull/51#discussion_r1719185873


##########
crates/paimon/src/io/file_io.rs:
##########
@@ -85,42 +99,47 @@ impl FileIO {
     /// References: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L105>
     ///
     /// FIXME: how to handle large dir? Better to return a stream instead?
-    pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
-        let entries = self
-            .op
-            .list_with(path)
-            .metakey(Metakey::ContentLength | Metakey::LastModified)
-            .await
-            .context(IoUnexpectedSnafu {
-                message: "Failed to list file status".to_string(),
-            })?;
+    pub async fn list_status(&self, path: String) -> Result<Vec<FileStatus>> {
+        let (op, relative_path) = self.inner_storage.create_operator(&path)?;
+
+        let entries = op.list(relative_path).await.context(IoUnexpectedSnafu {
+            message: "opendal list status failed",
+        })?;
+
+        let mut statuses = Vec::new();
 
-        Ok(entries
-            .into_iter()
-            .map(|meta| FileStatus {
-                size: meta.metadata().content_length(),
-                is_dir: meta.metadata().is_dir(),
-                last_modified: meta.metadata().last_modified(),
-                path: format!("{}{}", path, meta.name()),
-            })
-            .collect())
+        for entry in entries {
+            let meta = entry.metadata();
+            statuses.push(FileStatus {
+                size: meta.content_length(),
+                is_dir: meta.is_dir(),
+                path: path.clone(),
+                last_modified: meta.last_modified(),
+            });
+        }
+
+        Ok(statuses)
     }
 
     /// Check if exists.
     ///
     /// References: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L128>
-    pub async fn exists(&self, path: &str) -> Result<bool> {
-        self.op.is_exist(path).await.context(IoUnexpectedSnafu {
-            message: "Failed to check file existence".to_string(),
+    pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {

Review Comment:
   The same.



##########
crates/paimon/src/error.rs:
##########
@@ -23,11 +23,11 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
 /// Error type for paimon.
 #[derive(Debug, Snafu)]
 pub enum Error {
-    #[snafu(display("Paimon data invalid for {}: {:?}", message, source))]
+    #[snafu(display("Paimon data invalid for {}", message))]

Review Comment:
   Is this change relevant?



##########
crates/paimon/src/io/file_io.rs:
##########
@@ -17,66 +17,80 @@
 
 use crate::error::*;
 use std::collections::HashMap;
+use std::ops::Range;
+use std::sync::Arc;
 
-use chrono::offset::Utc;
-use chrono::DateTime;
-use opendal::services::Fs;
-use opendal::{Metakey, Operator};
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use opendal::Operator;
 use snafu::ResultExt;
+use url::Url;
+
+use super::Storage;
 
 #[derive(Clone, Debug)]
 pub struct FileIO {
-    op: Operator,
+    inner_storage: Arc<Storage>,

Review Comment:
   I prefer short names whenever possible. How about using `storage`?



##########
crates/paimon/src/io/storage.rs:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use opendal::{Operator, Scheme};
+
+use crate::error;
+
+use super::FileIOBuilder;
+
+/// The storage carries all supported storage services in paimon
+#[derive(Debug)]
+pub enum Storage {
+    #[cfg(feature = "storage-memory")]
+    Memory,
+    #[cfg(feature = "storage-fs")]
+    LocalFs,
+}
+
+impl Storage {
+    pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> 
{
+        let (scheme_str, _) = file_io_builder.into_parts();
+        let scheme = Self::parse_scheme(&scheme_str)?;
+
+        match scheme {
+            #[cfg(feature = "storage-memory")]
+            Scheme::Memory => Ok(Self::Memory),
+            #[cfg(feature = "storage-fs")]
+            Scheme::Fs => Ok(Self::LocalFs),
+            _ => Err(error::Error::IoUnsupported {
+                message: "Unsupported storage feature".to_string(),
+            }),
+        }
+    }
+
+    pub(crate) fn create_operator<'a>(
+        &self,
+        path: &'a impl AsRef<str>,

Review Comment:
   I prefer to make things simple. What do you think about using `&str` here?



##########
crates/paimon/src/io/file_io.rs:
##########
@@ -129,10 +148,15 @@ impl FileIO {
     /// Delete a dir recursively.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
-    pub async fn delete_dir(&self, path: &str) -> Result<()> {
-        self.op.remove_all(path).await.context(IoUnexpectedSnafu {
-            message: "Failed to delete dir".to_string(),
-        })?;
+    pub async fn delete_dir(&self, path: impl AsRef<str>) -> Result<()> {

Review Comment:
   The same



##########
crates/paimon/src/io/file_io.rs:
##########
@@ -129,10 +148,15 @@ impl FileIO {
     /// Delete a dir recursively.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
-    pub async fn delete_dir(&self, path: &str) -> Result<()> {
-        self.op.remove_all(path).await.context(IoUnexpectedSnafu {
-            message: "Failed to delete dir".to_string(),
-        })?;
+    pub async fn delete_dir(&self, path: impl AsRef<str>) -> Result<()> {
+        let (op, relative_path) = self.inner_storage.create_operator(&path)?;
+
+        op.remove_all(relative_path)
+            .await
+            .context(IoUnexpectedSnafu {
+                message: "opendal delete directory failed",

Review Comment:
   We don't need to expose the underlying details that `opendal` delete failed.



##########
crates/paimon/src/io/file_io.rs:
##########
@@ -17,66 +17,80 @@
 
 use crate::error::*;
 use std::collections::HashMap;
+use std::ops::Range;
+use std::sync::Arc;
 
-use chrono::offset::Utc;
-use chrono::DateTime;
-use opendal::services::Fs;
-use opendal::{Metakey, Operator};
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use opendal::Operator;
 use snafu::ResultExt;
+use url::Url;
+
+use super::Storage;
 
 #[derive(Clone, Debug)]
 pub struct FileIO {
-    op: Operator,
+    inner_storage: Arc<Storage>,
 }
 
 impl FileIO {
-    /// Create a new FileIO.
+    /// Try to infer file io scheme from path.
     ///
     /// The input HashMap is paimon-java's 
[`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
-    ///
-    /// TODO: Support building Operator from HashMap via options.
-    pub fn new(_: HashMap<String, String>) -> Result<Self> {
-        let op = Operator::new(Fs::default().root("/"))
-            .context(IoUnexpectedSnafu {
-                message: "Failed to create operator".to_string(),
-            })?
-            .finish();
-        Ok(Self { op })
+    pub fn from_path(path: impl AsRef<str>) -> Result<FileIOBuilder> {
+        let url = Url::parse(path.as_ref())
+            .context(UrlParseSnafu)
+            .or_else(|_| {
+                Url::from_file_path(path.as_ref()).map_err(|_| 
Error::DataInvalid {
+                    message: "Input is neither a valid URL nor a valid file 
path".to_string(),
+                })
+            })?;
+
+        Ok(FileIOBuilder::new(url.scheme()))
     }
 
     /// Create a new input file to read data.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
-    pub fn new_input(&self, path: &str) -> InputFile {
-        InputFile {
-            _op: self.op.clone(),
-            path: path.to_string(),
-        }
+    pub fn new_input(&self, path: impl AsRef<str>) -> crate::Result<InputFile> 
{

Review Comment:
   I think the `impl AsRef<str>` is somewhat overengineered. Accepting a `&str` 
is sufficient, especially since we're calling `to_string` internally.



##########
crates/paimon/src/io/file_io.rs:
##########
@@ -17,66 +17,80 @@
 
 use crate::error::*;
 use std::collections::HashMap;
+use std::ops::Range;
+use std::sync::Arc;
 
-use chrono::offset::Utc;
-use chrono::DateTime;
-use opendal::services::Fs;
-use opendal::{Metakey, Operator};
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use opendal::Operator;
 use snafu::ResultExt;
+use url::Url;
+
+use super::Storage;
 
 #[derive(Clone, Debug)]
 pub struct FileIO {
-    op: Operator,
+    inner_storage: Arc<Storage>,
 }
 
 impl FileIO {
-    /// Create a new FileIO.
+    /// Try to infer file io scheme from path.
     ///
     /// The input HashMap is paimon-java's 
[`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
-    ///
-    /// TODO: Support building Operator from HashMap via options.
-    pub fn new(_: HashMap<String, String>) -> Result<Self> {
-        let op = Operator::new(Fs::default().root("/"))
-            .context(IoUnexpectedSnafu {
-                message: "Failed to create operator".to_string(),
-            })?
-            .finish();
-        Ok(Self { op })
+    pub fn from_path(path: impl AsRef<str>) -> Result<FileIOBuilder> {
+        let url = Url::parse(path.as_ref())
+            .context(UrlParseSnafu)
+            .or_else(|_| {
+                Url::from_file_path(path.as_ref()).map_err(|_| 
Error::DataInvalid {
+                    message: "Input is neither a valid URL nor a valid file 
path".to_string(),
+                })
+            })?;
+
+        Ok(FileIOBuilder::new(url.scheme()))
     }
 
     /// Create a new input file to read data.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
-    pub fn new_input(&self, path: &str) -> InputFile {
-        InputFile {
-            _op: self.op.clone(),
-            path: path.to_string(),
-        }
+    pub fn new_input(&self, path: impl AsRef<str>) -> crate::Result<InputFile> 
{
+        let (op, relative_path) = self.inner_storage.create_operator(&path)?;
+        let path = path.as_ref().to_string();
+        let relative_path_pos = path.len() - relative_path.len();
+        Ok(InputFile {
+            op,
+            path,
+            relative_path_pos,
+        })
     }
 
     /// Create a new output file to write data.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L87>
-    pub fn new_output(&self, path: &str) -> OutputFile {
-        OutputFile {
-            _op: self.op.clone(),
-            path: path.to_string(),
-        }
+    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
+        let (op, relative_path) = self.inner_storage.create_operator(&path)?;
+        let path = path.as_ref().to_string();
+        let relative_path_pos = path.len() - relative_path.len();
+        Ok(OutputFile {
+            op,
+            path,
+            relative_path_pos,
+        })
     }
 
     /// Return a file status object that represents the path.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L97>
-    pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
-        let meta = self.op.stat(path).await.context(IoUnexpectedSnafu {
-            message: "Failed to get file status".to_string(),
+    pub async fn get_status(&self, path: String) -> Result<FileStatus> {

Review Comment:
   It's better to keep our API consistent. Accepting `&str` here looks nicer to 
me.



##########
crates/paimon/src/error.rs:
##########
@@ -42,4 +42,21 @@ pub enum Error {
         message: String,
         source: opendal::Error,
     },
+    #[snafu(
+        visibility(pub(crate)),
+        display("Paimon hitting unsupported io error {}", message)
+    )]
+    IoUnsupported { message: String },
+    #[snafu(visibility(pub(crate)), display("Failed to parse URL: {:?}", 
source))]
+    UrlParse { source: url::ParseError },

Review Comment:
   I don't think we should expose a URL parse error directly to the user. 
Perhaps we can use `ConfigInvalid` with an appropriate message instead.



##########
crates/paimon/src/io/storage.rs:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use opendal::{Operator, Scheme};
+
+use crate::error;
+
+use super::FileIOBuilder;
+
+/// The storage carries all supported storage services in paimon
+#[derive(Debug)]
+pub enum Storage {
+    #[cfg(feature = "storage-memory")]
+    Memory,
+    #[cfg(feature = "storage-fs")]
+    LocalFs,
+}
+
+impl Storage {
+    pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> 
{
+        let (scheme_str, _) = file_io_builder.into_parts();
+        let scheme = Self::parse_scheme(&scheme_str)?;
+
+        match scheme {
+            #[cfg(feature = "storage-memory")]
+            Scheme::Memory => Ok(Self::Memory),
+            #[cfg(feature = "storage-fs")]
+            Scheme::Fs => Ok(Self::LocalFs),
+            _ => Err(error::Error::IoUnsupported {
+                message: "Unsupported storage feature".to_string(),
+            }),
+        }
+    }
+
+    pub(crate) fn create_operator<'a>(

Review Comment:
   How about using `create`?



##########
crates/paimon/src/io/file_io.rs:
##########
@@ -17,66 +17,80 @@
 
 use crate::error::*;
 use std::collections::HashMap;
+use std::ops::Range;
+use std::sync::Arc;
 
-use chrono::offset::Utc;
-use chrono::DateTime;
-use opendal::services::Fs;
-use opendal::{Metakey, Operator};
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use opendal::Operator;
 use snafu::ResultExt;
+use url::Url;
+
+use super::Storage;
 
 #[derive(Clone, Debug)]
 pub struct FileIO {
-    op: Operator,
+    inner_storage: Arc<Storage>,
 }
 
 impl FileIO {
-    /// Create a new FileIO.
+    /// Try to infer file io scheme from path.
     ///
     /// The input HashMap is paimon-java's 
[`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
-    ///
-    /// TODO: Support building Operator from HashMap via options.
-    pub fn new(_: HashMap<String, String>) -> Result<Self> {
-        let op = Operator::new(Fs::default().root("/"))
-            .context(IoUnexpectedSnafu {
-                message: "Failed to create operator".to_string(),
-            })?
-            .finish();
-        Ok(Self { op })
+    pub fn from_path(path: impl AsRef<str>) -> Result<FileIOBuilder> {

Review Comment:
   Maybe this API should call `from_url`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to