This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch implement-list-metakey in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 9730699fd4569a8c2dbe062e7bd08af0ea761ea9 Author: Xuanwo <[email protected]> AuthorDate: Mon Aug 7 15:41:48 2023 +0800 feat: Add async list with metakey support Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/immutable_index.rs | 25 +-- core/src/raw/oio/entry.rs | 2 +- core/src/raw/ops.rs | 19 +++ core/src/types/entry.rs | 48 +++--- core/src/types/list.rs | 153 ++++++------------ core/src/types/metadata.rs | 52 ++++++ core/src/types/operator/blocking_operator.rs | 94 ----------- core/src/types/operator/operator.rs | 231 +++++++++++---------------- 8 files changed, 248 insertions(+), 376 deletions(-) diff --git a/core/src/layers/immutable_index.rs b/core/src/layers/immutable_index.rs index cc2cf1907..374482151 100644 --- a/core/src/layers/immutable_index.rs +++ b/core/src/layers/immutable_index.rs @@ -310,10 +310,7 @@ mod tests { "duplicated value: {}", entry.path() ); - map.insert( - entry.path().to_string(), - op.metadata(&entry, Metakey::Mode).await?.mode(), - ); + map.insert(entry.path().to_string(), entry.metadata().mode()); } assert_eq!(map["file"], EntryMode::FILE); @@ -351,10 +348,7 @@ mod tests { "duplicated value: {}", entry.path() ); - map.insert( - entry.path().to_string(), - op.metadata(&entry, Metakey::Mode).await?.mode(), - ); + map.insert(entry.path().to_string(), entry.metadata().mode()); } debug!("current files: {:?}", map); @@ -398,10 +392,7 @@ mod tests { "duplicated value: {}", entry.path() ); - map.insert( - entry.path().to_string(), - op.metadata(&entry, Metakey::Mode).await?.mode(), - ); + map.insert(entry.path().to_string(), entry.metadata().mode()); } assert_eq!(map.len(), 1); @@ -417,10 +408,7 @@ mod tests { "duplicated value: {}", entry.path() ); - map.insert( - entry.path().to_string(), - op.metadata(&entry, Metakey::Mode).await?.mode(), - ); + map.insert(entry.path().to_string(), entry.metadata().mode()); } assert_eq!(map["dataset/stateful/ontime_2007_200.csv"], EntryMode::FILE); @@ -462,10 +450,7 @@ mod tests { "duplicated value: {}", entry.path() ); - map.insert( - entry.path().to_string(), - op.metadata(&entry, Metakey::Mode).await?.mode(), - ); + map.insert(entry.path().to_string(), entry.metadata().mode()); } debug!("current files: {:?}", map); diff --git a/core/src/raw/oio/entry.rs b/core/src/raw/oio/entry.rs index f476aa887..43c4b9631 100644 --- a/core/src/raw/oio/entry.rs +++ b/core/src/raw/oio/entry.rs @@ -79,6 +79,6 @@ impl Entry { /// /// NOTE: implement this by hand to avoid leaking raw entry to end-users. pub(crate) fn into_entry(self) -> crate::Entry { - crate::Entry::new_with(self.path, self.meta) + crate::Entry::new(self.path, self.meta) } } diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index 308930b5c..646df692b 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -19,6 +19,8 @@ //! //! By using ops, users can add more context for operation. +use crate::Metakey; +use flagset::FlagSet; use std::time::Duration; use crate::raw::*; @@ -77,6 +79,8 @@ pub struct OpList { /// The delimiter used to for the list operation. Default to be `/` delimiter: String, + + metakey: FlagSet<Metakey>, } impl Default for OpList { @@ -85,6 +89,8 @@ impl Default for OpList { limit: None, start_after: None, delimiter: "/".to_string(), + // By default, we want to know what's the mode of this entry. + metakey: Metakey::Mode.into(), } } } @@ -127,6 +133,19 @@ impl OpList { pub fn delimiter(&self) -> &str { &self.delimiter } + + /// Change the metakey of this list operation. + /// + /// The default metakey is `Metakey::Mode`. + pub fn with_metakey(mut self, metakey: impl Into<FlagSet<Metakey>>) -> Self { + self.metakey = metakey.into(); + self + } + + /// Get the current metakey. + pub fn metakey(&self) -> FlagSet<Metakey> { + self.metakey + } } /// Args for `presign` operation. diff --git a/core/src/types/entry.rs b/core/src/types/entry.rs index d0a8be2db..e87b55ff4 100644 --- a/core/src/types/entry.rs +++ b/core/src/types/entry.rs @@ -18,14 +18,16 @@ use crate::raw::*; use crate::*; -/// Entry is the file/dir entry returned by `Lister`. +/// Entry returned by [`Lister`] or [`BlockingLister`] to represent a path and it's relative metadata. +/// +/// TODO: add notes about metakey here. #[derive(Clone, Debug)] pub struct Entry { - /// Path of the entry. + /// Path of this entry. path: String, - /// Optional cached metadata - metadata: Option<Metadata>, + /// Metadata of this entry. + metadata: Metadata, } impl Entry { @@ -38,43 +40,35 @@ impl Entry { /// /// The only way to get an entry with associated cached metadata /// is `Operator::list` or `Operator::scan`. - pub(crate) fn new_with(path: String, metadata: Metadata) -> Self { - Self { - path, - metadata: Some(metadata), - } - } - - /// Create an [`Entry`] with empty cached metadata. - pub fn new(path: &str) -> Self { - Self { - path: normalize_path(path), - metadata: None, - } + pub(crate) fn new(path: String, metadata: Metadata) -> Self { + Self { path, metadata } } /// Path of entry. Path is relative to operator's root. + /// /// Only valid in current operator. + /// + /// If this entry is a dir, `path` MUST end with `/` + /// Otherwise, `path` MUST NOT end with `/`. pub fn path(&self) -> &str { &self.path } /// Name of entry. Name is the last segment of path. /// - /// If this entry is a dir, `Name` MUST endswith `/` - /// Otherwise, `Name` MUST NOT endswith `/`. + /// If this entry is a dir, `name` MUST end with `/` + /// Otherwise, `name` MUST NOT end with `/`. pub fn name(&self) -> &str { get_basename(&self.path) } - /// Get the cached metadata of entry. - /// - /// # Notes - /// - /// This function is crate internal only. Because the returning - /// metadata could be incomplete. Users must use `Operator::metadata` - /// to query the cached metadata instead. - pub(crate) fn metadata(&self) -> &Option<Metadata> { + /// Fetch metadata of this entry. + pub fn metadata(&self) -> &Metadata { &self.metadata } + + /// Consume this entry to get it's path and metadata. + pub fn into_parts(self) -> (String, Metadata) { + (self.path, self.metadata) + } } diff --git a/core/src/types/list.rs b/core/src/types/list.rs index 275a76db7..a59bd31f4 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +use flagset::FlagSet; use std::collections::VecDeque; -use std::mem; use std::pin::Pin; use std::task::ready; use std::task::Context; @@ -29,22 +29,26 @@ use futures::Stream; use crate::raw::*; use crate::*; +/// Future constructed by listing. +type ListFuture = BoxFuture<'static, (oio::Pager, Result<Option<Vec<oio::Entry>>>)>; +/// Future constructed by stating. +type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>; + /// Lister is designed to list entries at given path in an asynchronous /// manner. /// -/// Users can construct Lister by `list` or `scan`. +/// Users can construct Lister by [`Operator::lister`]. /// -/// User can use lister as `Stream<Item = Result<Entry>>` or -/// call `next_page` directly. +/// User can use lister as `Stream<Item = Result<Entry>>`. pub struct Lister { - pager: Option<oio::Pager>, + acc: FusedAccessor, + /// required_metakey is the metakey required by users. + required_metakey: FlagSet<Metakey>, buf: VecDeque<oio::Entry>, - /// We will move `pager` inside future and return it back while future is ready. - /// Thus, we should not allow calling other function while we already have - /// a future. - #[allow(clippy::type_complexity)] - fut: Option<BoxFuture<'static, (oio::Pager, Result<Option<Vec<oio::Entry>>>)>>, + pager: Option<oio::Pager>, + listing: Option<ListFuture>, + stating: Option<StatFuture>, } /// # Safety @@ -54,75 +58,19 @@ unsafe impl Sync for Lister {} impl Lister { /// Create a new lister. - pub(crate) fn new(pager: oio::Pager) -> Self { - Self { - pager: Some(pager), - buf: VecDeque::default(), - fut: None, - } - } - - /// has_next can be used to check if there are more pages. - pub async fn has_next(&mut self) -> Result<bool> { - debug_assert!( - self.fut.is_none(), - "there are ongoing futures for next page" - ); + pub(crate) async fn create(acc: FusedAccessor, path: &str, args: OpList) -> Result<Self> { + let required_metakey = args.metakey(); + let (_, pager) = acc.list(path, args).await?; - if !self.buf.is_empty() { - return Ok(true); - } - - let entries = match self - .pager - .as_mut() - .expect("pager must be valid") - .next() - .await? - { - // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation. - // - // However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E) - Some(entries) => entries.into(), - None => return Ok(false), - }; - // Push fetched entries into buffer. - self.buf = entries; - - Ok(true) - } + Ok(Self { + acc, + required_metakey, - /// next_page can be used to fetch a new page. - /// - /// # Notes - /// - /// Don't mix the usage of `next_page` and `Stream<Item = Result<Entry>>`. - /// Always using the same calling style. - pub async fn next_page(&mut self) -> Result<Option<Vec<Entry>>> { - debug_assert!( - self.fut.is_none(), - "there are ongoing futures for next page" - ); - - let entries = if !self.buf.is_empty() { - mem::take(&mut self.buf) - } else { - match self - .pager - .as_mut() - .expect("pager must be valid") - .next() - .await? - { - // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation. - // - // However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E) - Some(entries) => entries.into(), - None => return Ok(None), - } - }; - - Ok(Some(entries.into_iter().map(|v| v.into_entry()).collect())) + buf: VecDeque::new(), + pager: Some(pager), + listing: None, + stating: None, + }) } } @@ -130,22 +78,44 @@ impl Stream for Lister { type Item = Result<Entry>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if let Some(fut) = self.stating.as_mut() { + let (path, rp) = ready!(fut.poll_unpin(cx)); + let metadata = rp?.into_metadata(); + + self.stating = None; + return Poll::Ready(Some(Ok(Entry::new(path, metadata)))); + } + if let Some(oe) = self.buf.pop_front() { - return Poll::Ready(Some(Ok(oe.into_entry()))); + let (path, metadata) = oe.into_entry().into_parts(); + // TODO: we can optimize this by checking the provided metakey provided by services. + if metadata.contains_bit(self.required_metakey) { + return Poll::Ready(Some(Ok(Entry::new(path, metadata)))); + } + + let acc = self.acc.clone(); + let fut = async move { + let path = path; + let res = acc.stat(&path, OpStat::default()).await; + + (path, res) + }; + self.stating = Some(Box::pin(fut)); + return self.poll_next(cx); } - if let Some(fut) = self.fut.as_mut() { + if let Some(fut) = self.listing.as_mut() { let (op, res) = ready!(fut.poll_unpin(cx)); self.pager = Some(op); return match res? { Some(oes) => { - self.fut = None; + self.listing = None; self.buf = oes.into(); self.poll_next(cx) } None => { - self.fut = None; + self.listing = None; Poll::Ready(None) } }; @@ -157,7 +127,7 @@ impl Stream for Lister { (pager, res) }; - self.fut = Some(Box::pin(fut)); + self.listing = Some(Box::pin(fut)); self.poll_next(cx) } } @@ -165,7 +135,7 @@ impl Stream for Lister { /// BlockingLister is designed to list entries at given path in a blocking /// manner. /// -/// Users can construct Lister by `blocking_list` or `blocking_scan`. +/// Users can construct Lister by `blocking_lister`. pub struct BlockingLister { pager: oio::BlockingPager, buf: VecDeque<oio::Entry>, @@ -184,23 +154,6 @@ impl BlockingLister { buf: VecDeque::default(), } } - - /// next_page can be used to fetch a new page. - pub fn next_page(&mut self) -> Result<Option<Vec<Entry>>> { - let entries = if !self.buf.is_empty() { - mem::take(&mut self.buf) - } else { - match self.pager.next()? { - // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation. - // - // However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E) - Some(entries) => entries.into(), - None => return Ok(None), - } - }; - - Ok(Some(entries.into_iter().map(|v| v.into_entry()).collect())) - } } /// TODO: we can implement next_chunk. diff --git a/core/src/types/metadata.rs b/core/src/types/metadata.rs index 02f5bb90d..66a7b41d4 100644 --- a/core/src/types/metadata.rs +++ b/core/src/types/metadata.rs @@ -85,6 +85,18 @@ impl Metadata { self } + /// Check if there metadata already contains given bit. + pub(crate) fn contains_bit(&self, bit: impl Into<FlagSet<Metakey>>) -> bool { + let input_bit = bit.into(); + + // If meta already contains complete, we don't need to check. + if self.bit.contains(Metakey::Complete) { + return true; + } + + self.bit.contains(input_bit) + } + /// mode represent this entry's mode. pub fn mode(&self) -> EntryMode { debug_assert!( @@ -155,6 +167,11 @@ impl Metadata { /// /// `Content-Length` is defined by [RFC 7230](https://httpwg.org/specs/rfc7230.html#header.content-length) /// Refer to [MDN Content-Length](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length) for more information. + /// + /// # Panics + /// + /// This value is only available when calling on result of `stat` or `list` with + /// [`Metakey::ContentLength`], otherwise it will panic. pub fn content_length(&self) -> u64 { debug_assert!( self.bit.contains(Metakey::ContentLength) || self.bit.contains(Metakey::Complete), @@ -189,6 +206,11 @@ impl Metadata { /// And removed by [RFC 7231](https://www.rfc-editor.org/rfc/rfc7231). /// /// OpenDAL will try its best to set this value, but not guarantee this value is the md5 of content. + /// + /// # Panics + /// + /// This value is only available when calling on result of `stat` or `list` with + /// [`Metakey::ContentMd5`], otherwise it will panic. pub fn content_md5(&self) -> Option<&str> { debug_assert!( self.bit.contains(Metakey::ContentMd5) || self.bit.contains(Metakey::Complete), @@ -221,6 +243,11 @@ impl Metadata { /// Content Type of this entry. /// /// Content Type is defined by [RFC 9110](https://httpwg.org/specs/rfc9110.html#field.content-type). + /// + /// # Panics + /// + /// This value is only available when calling on result of `stat` or `list` with + /// [`Metakey::ContentType`], otherwise it will panic. pub fn content_type(&self) -> Option<&str> { debug_assert!( self.bit.contains(Metakey::ContentType) || self.bit.contains(Metakey::Complete), @@ -251,6 +278,11 @@ impl Metadata { /// Content Range of this entry. /// /// Content Range is defined by [RFC 9110](https://httpwg.org/specs/rfc9110.html#field.content-range). + /// + /// # Panics + /// + /// This value is only available when calling on result of `stat` or `list` with + /// [`Metakey::ContentRange`], otherwise it will panic. pub fn content_range(&self) -> Option<BytesContentRange> { debug_assert!( self.bit.contains(Metakey::ContentRange) || self.bit.contains(Metakey::Complete), @@ -284,6 +316,11 @@ impl Metadata { /// Refer to [MDN Last-Modified](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified) for more information. /// /// OpenDAL parse the raw value into [`DateTime`] for convenient. + /// + /// # Panics + /// + /// This value is only available when calling on result of `stat` or `list` with + /// [`Metakey::LastModified`], otherwise it will panic. pub fn last_modified(&self) -> Option<DateTime<Utc>> { debug_assert!( self.bit.contains(Metakey::LastModified) || self.bit.contains(Metakey::Complete), @@ -324,6 +361,11 @@ impl Metadata { /// - `W/"0815"` /// /// `"` is part of etag. + /// + /// # Panics + /// + /// This value is only available when calling on result of `stat` or `list` with + /// [`Metakey::Etag`], otherwise it will panic. pub fn etag(&self) -> Option<&str> { debug_assert!( self.bit.contains(Metakey::Etag) || self.bit.contains(Metakey::Complete), @@ -378,6 +420,11 @@ impl Metadata { /// - "inline" /// - "attachment" /// - "attachment; filename=\"filename.jpg\"" + /// + /// # Panics + /// + /// This value is only available when calling on result of `stat` or `list` with + /// [`Metakey::ContentDisposition`], otherwise it will panic. pub fn content_disposition(&self) -> Option<&str> { debug_assert!( self.bit.contains(Metakey::ContentDisposition) || self.bit.contains(Metakey::Complete), @@ -426,6 +473,11 @@ impl Metadata { /// Version is a string that can be used to identify the version of this entry. /// /// This field may come out from the version control system, like object versioning in AWS S3. + /// + /// # Panics + /// + /// This value is only available when calling on result of `stat` or `list` with + /// [`Metakey::Version`], otherwise it will panic. pub fn version(&self) -> Option<&str> { debug_assert!( self.bit.contains(Metakey::Version) || self.bit.contains(Metakey::Complete), diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 2b479624a..90cf3dee0 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -19,7 +19,6 @@ use std::io::Read; use std::ops::RangeBounds; use bytes::Bytes; -use flagset::FlagSet; use super::operator_functions::*; use crate::raw::*; @@ -153,99 +152,6 @@ impl BlockingOperator { Ok(meta) } - /// Get current metadata with cache in blocking way. - /// - /// `metadata` will check the given query with already cached metadata - /// first. And query from storage if not found. - /// - /// # Notes - /// - /// Use `metadata` if you are working with entries returned by - /// [`Lister`]. It's highly possible that metadata you want - /// has already been cached. - /// - /// You may want to use `stat`, if you: - /// - /// - Want detect the outside changes of file. - /// - Don't want to read from cached file metadata. - /// - /// # Behavior - /// - /// Visiting not fetched metadata will lead to panic in debug build. - /// It must be a bug, please fix it instead. - /// - /// # Examples - /// - /// ## Query already cached metadata - /// - /// By query metadata with `None`, we can only query in-memory metadata - /// cache. In this way, we can make sure that no API call will send. - /// - /// ``` - /// # use anyhow::Result; - /// # use opendal::BlockingOperator; - /// use opendal::Entry; - /// - /// # fn test(op: BlockingOperator, entry: Entry) -> Result<()> { - /// let meta = op.metadata(&entry, None)?; - /// // content length COULD be correct. - /// let _ = meta.content_length(); - /// // etag COULD be correct. - /// let _ = meta.etag(); - /// # Ok(()) - /// # } - /// ``` - /// - /// ## Query content length and content type - /// - /// ``` - /// # use anyhow::Result; - /// # use opendal::BlockingOperator; - /// use opendal::Entry; - /// use opendal::Metakey; - /// - /// # fn test(op: BlockingOperator, entry: Entry) -> Result<()> { - /// let meta = op.metadata(&entry, { Metakey::ContentLength | Metakey::ContentType })?; - /// // content length MUST be correct. - /// let _ = meta.content_length(); - /// // etag COULD be correct. - /// let _ = meta.etag(); - /// # Ok(()) - /// # } - /// ``` - /// - /// ## Query all metadata - /// - /// By query metadata with `Complete`, we can make sure that we have fetched all metadata of this entry. - /// - /// ``` - /// # use anyhow::Result; - /// # use opendal::BlockingOperator; - /// use opendal::Entry; - /// use opendal::Metakey; - /// - /// # fn test(op: BlockingOperator, entry: Entry) -> Result<()> { - /// let meta = op.metadata(&entry, { Metakey::Complete })?; - /// // content length MUST be correct. - /// let _ = meta.content_length(); - /// // etag MUST be correct. - /// let _ = meta.etag(); - /// # Ok(()) - /// # } - /// ``` - pub fn metadata(&self, entry: &Entry, flags: impl Into<FlagSet<Metakey>>) -> Result<Metadata> { - // Check if cached metadata saticifies the query. - if let Some(meta) = entry.metadata() { - if meta.bit().contains(flags) || meta.bit().contains(Metakey::Complete) { - return Ok(meta.clone()); - } - } - - // Else request from backend.. - let meta = self.stat(entry.path())?; - Ok(meta) - } - /// Check if this path exists or not. /// /// # Example diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 4c2fdfa40..c0703a9a1 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -19,7 +19,6 @@ use std::ops::RangeBounds; use std::time::Duration; use bytes::Bytes; -use flagset::FlagSet; use futures::stream; use futures::AsyncReadExt; use futures::Stream; @@ -159,18 +158,13 @@ impl Operator { } } - /// Get current path's metadata **without cache** directly. + /// Get current path's metadata. /// /// # Notes /// - /// Use `stat` if you: - /// - /// - Want to detect the outside changes of path. - /// - Don't want to read from cached metadata. - /// - /// You may want to use `metadata` if you are working with entries - /// returned by [`Lister`]. It's highly possible that metadata - /// you want has already been cached. + /// For fetch metadata of entries returned by [`Lister`], it's better to use [`list_with`] and + /// [`lister_with`] with `metakey` query like `Metakey::ContentLength | Metakey::LastModified` + /// so that we can avoid extra requests. /// /// # Examples /// @@ -245,107 +239,6 @@ impl Operator { fut } - /// Get current metadata with cache. - /// - /// `metadata` will check the given query with already cached metadata - /// first. And query from storage if not found. - /// - /// # Notes - /// - /// Use `metadata` if you are working with entries returned by - /// [`Lister`]. It's highly possible that metadata you want - /// has already been cached. - /// - /// You may want to use `stat`, if you: - /// - /// - Want to detect the outside changes of path. - /// - Don't want to read from cached metadata. - /// - /// # Behavior - /// - /// Visiting not fetched metadata will lead to panic in debug build. - /// It must be a bug, please fix it instead. - /// - /// # Examples - /// - /// ## Query already cached metadata - /// - /// By querying metadata with `None`, we can only query in-memory metadata - /// cache. In this way, we can make sure that no API call will be sent. - /// - /// ``` - /// # use anyhow::Result; - /// # use opendal::Operator; - /// use opendal::Entry; - /// # #[tokio::main] - /// # async fn test(op: Operator, entry: Entry) -> Result<()> { - /// let meta = op.metadata(&entry, None).await?; - /// // content length COULD be correct. - /// let _ = meta.content_length(); - /// // etag COULD be correct. - /// let _ = meta.etag(); - /// # Ok(()) - /// # } - /// ``` - /// - /// ## Query content length and content type - /// - /// ``` - /// # use anyhow::Result; - /// # use opendal::Operator; - /// use opendal::Entry; - /// use opendal::Metakey; - /// - /// # #[tokio::main] - /// # async fn test(op: Operator, entry: Entry) -> Result<()> { - /// let meta = op - /// .metadata(&entry, Metakey::ContentLength | Metakey::ContentType) - /// .await?; - /// // content length MUST be correct. - /// let _ = meta.content_length(); - /// // etag COULD be correct. - /// let _ = meta.etag(); - /// # Ok(()) - /// # } - /// ``` - /// - /// ## Query all metadata - /// - /// By querying metadata with `Complete`, we can make sure that we have fetched all metadata of this entry. - /// - /// ``` - /// # use anyhow::Result; - /// # use opendal::Operator; - /// use opendal::Entry; - /// use opendal::Metakey; - /// - /// # #[tokio::main] - /// # async fn test(op: Operator, entry: Entry) -> Result<()> { - /// let meta = op.metadata(&entry, Metakey::Complete).await?; - /// // content length MUST be correct. - /// let _ = meta.content_length(); - /// // etag MUST be correct. - /// let _ = meta.etag(); - /// # Ok(()) - /// # } - /// ``` - pub async fn metadata( - &self, - entry: &Entry, - flags: impl Into<FlagSet<Metakey>>, - ) -> Result<Metadata> { - // Check if cached metadata saticifies the query. - if let Some(meta) = entry.metadata() { - if meta.bit().contains(flags) || meta.bit().contains(Metakey::Complete) { - return Ok(meta.clone()); - } - } - - // Else request from backend.. - let meta = self.stat(entry.path()).await?; - Ok(meta) - } - /// Check if this path exists or not. /// /// # Example @@ -1270,13 +1163,13 @@ impl Operator { /// /// # Notes /// - /// ## For listing recursively + /// ## Listing recursively /// /// This function only read the children of the given directory. To read /// all entries recursively, use `Operator::list_with("path").delimiter("")` /// instead. /// - /// ## For streaming + /// ## Streaming /// /// This function will read all entries in the given directory. It could /// take very long time and consume a lot of memory if the directory @@ -1285,6 +1178,11 @@ impl Operator { /// In order to avoid this, you can use [`Operator::lister`] to list entries in /// a streaming way. /// + /// ## Metadata + /// + /// The only metadata that is guaranteed to be available is the `Mode`. + /// For fetching more metadata, please use [`Operator::list_with`] and `metakey`. + /// /// # Examples /// /// ```no_run @@ -1296,13 +1194,12 @@ impl Operator { /// # async fn test(op: Operator) -> Result<()> { /// let mut entries = op.list("path/to/dir/").await?; /// for entry in entries { - /// let meta = op.metadata(&entry, Metakey::Mode).await?; - /// match meta.mode() { + /// match entry.metadata().mode() { /// EntryMode::FILE => { /// println!("Handling file") /// } /// EntryMode::DIR => { - /// println!("Handling dir like start a new list via meta.path()") + /// println!("Handling dir {}", entry.path()) /// } /// EntryMode::Unknown => continue, /// } @@ -1327,6 +1224,11 @@ impl Operator { /// In order to avoid this, you can use [`Operator::lister`] to list entries in /// a streaming way. /// + /// ## Metadata + /// + /// The only metadata that is guaranteed to be available is the `Mode`. + /// For fetching more metadata, please specify the `metakey`. + /// /// # Examples /// /// ## List entries with prefix @@ -1342,8 +1244,7 @@ impl Operator { /// # async fn test(op: Operator) -> Result<()> { /// let mut entries = op.list_with("prefix/").delimiter("").await?; /// for entry in entries { - /// let meta = op.metadata(&entry, Metakey::Mode).await?; - /// match meta.mode() { + /// match entry.metadata().mode() { /// EntryMode::FILE => { /// println!("Handling file") /// } @@ -1356,6 +1257,32 @@ impl Operator { /// # Ok(()) /// # } /// ``` + /// + /// ## List entries with metakey for more metadata + /// + /// ```no_run + /// # use anyhow::Result; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// use opendal::Operator; + /// # #[tokio::main] + /// # async fn test(op: Operator) -> Result<()> { + /// let mut entries = op.list_with("dir/").metakey(Metakey::ContentLength|Metakey::LastModified).await?; + /// for entry in entries { + /// let meta = entry.metadata(); + /// match meta.mode() { + /// EntryMode::FILE => { + /// println!("Handling file {} with size {}", entry.path(), meta.content_length()) + /// } + /// EntryMode::DIR => { + /// println!("Handling dir {}", entry.path()) + /// } + /// EntryMode::Unknown => continue, + /// } + /// } + /// # Ok(()) + /// # } + /// ``` pub fn list_with(&self, path: &str) -> FutureList { let path = normalize_path(path); @@ -1375,8 +1302,7 @@ impl Operator { .with_context("path", &path)); } - let (_, pager) = inner.list(&path, args).await?; - let lister = Lister::new(pager); + let lister = Lister::create(inner, &path, args).await?; lister.try_collect().await }; @@ -1392,6 +1318,19 @@ impl Operator { /// /// An error will be returned if given path doesn't end with `/`. /// + /// # Notes + /// + /// ## Listing recursively + /// + /// This function only read the children of the given directory. To read + /// all entries recursively, use [`Operator::lister_with`] and `delimiter("")` + /// instead. + /// + /// ## Metadata + /// + /// The only metadata that is guaranteed to be available is the `Mode`. + /// For fetching more metadata, please use [`Operator::lister_with`] and `metakey`. + /// /// # Examples /// /// ```no_run @@ -1405,8 +1344,7 @@ impl Operator { /// # async fn test(op: Operator) -> Result<()> { /// let mut ds = op.lister("path/to/dir/").await?; /// while let Some(mut de) = ds.try_next().await? { - /// let meta = op.metadata(&de, Metakey::Mode).await?; - /// match meta.mode() { + /// match de.metadata().mode() { /// EntryMode::FILE => { /// println!("Handling file") /// } @@ -1447,14 +1385,13 @@ impl Operator { /// .limit(10) /// .start_after("start") /// .await?; - /// while let Some(mut de) = ds.try_next().await? { - /// let meta = op.metadata(&de, Metakey::Mode).await?; - /// match meta.mode() { + /// while let Some(mut entry) = ds.try_next().await? { + /// match entry.metadata().mode() { /// EntryMode::FILE => { - /// println!("Handling file") + /// println!("Handling file {}", entry.path()) /// } /// EntryMode::DIR => { - /// println!("Handling dir like start a new list via meta.path()") + /// println!("Handling dir {}", entry.path()) /// } /// EntryMode::Unknown => continue, /// } @@ -1475,14 +1412,42 @@ impl Operator { /// # #[tokio::main] /// # async fn test(op: Operator) -> Result<()> { /// let mut ds = op.lister_with("path/to/dir/").delimiter("").await?; - /// while let Some(mut de) = ds.try_next().await? { - /// let meta = op.metadata(&de, Metakey::Mode).await?; + /// while let Some(mut entry) = ds.try_next().await? { + /// match entry.metadata().mode() { + /// EntryMode::FILE => { + /// println!("Handling file {}", entry.path()) + /// } + /// EntryMode::DIR => { + /// println!("Handling dir {}", entry.path()) + /// } + /// EntryMode::Unknown => continue, + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// ## List files with required metadata + /// + /// ```no_run + /// # use anyhow::Result; + /// # use futures::io; + /// use futures::TryStreamExt; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// use opendal::Operator; + /// # #[tokio::main] + /// # async fn test(op: Operator) -> Result<()> { + /// let mut ds = op.lister_with("path/to/dir/") + /// .metakey(Metakey::ContentLength | Metakey::LastModified).await?; + /// while let Some(mut entry) = ds.try_next().await? { + /// let meta = entry.metadata(); /// match meta.mode() { /// EntryMode::FILE => { - /// println!("Handling file") + /// println!("Handling file {} with size {}", entry.path(), meta.content_length()) /// } /// EntryMode::DIR => { - /// println!("Handling dir like start a new list via meta.path()") + /// println!("Handling dir {}", entry.path()) /// } /// EntryMode::Unknown => continue, /// } @@ -1509,9 +1474,7 @@ impl Operator { .with_context("path", &path)); } - let (_, pager) = inner.list(&path, args).await?; - - Ok(Lister::new(pager)) + Lister::create(inner, &path, args).await }; Box::pin(fut) },
