This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bf4a6c315 Add a custom implementation 
`LocalFileSystem::list_with_offset`  (#7019)
5bf4a6c315 is described below

commit 5bf4a6c3154b5754e0ffdad107b4e8c6a800d78b
Author: Corwin Joy <[email protected]>
AuthorDate: Sat Feb 8 08:27:40 2025 -0800

    Add a custom implementation `LocalFileSystem::list_with_offset`  (#7019)
    
    * Initial change from Daniel.
    
    * Upgrade unit test to be more generic.
    
    * Add comments on why we have filter
    
    * Cleanup unit tests.
    
    * Update object_store/src/local.rs
    
    Co-authored-by: Adam Reeve <[email protected]>
    
    * Add changes suggested by Adam.
    
    * Cleanup match error.
    
    * Apply formatting changes suggested by cargo +stable fmt --all.
    
    * Apply cosmetic changes suggested by clippy.
    
    * Upgrade test_path_with_offset to create temporary directory + files for 
testing rather than pointing to existing dir.
    
    ---------
    
    Co-authored-by: Adam Reeve <[email protected]>
---
 object_store/src/local.rs | 219 ++++++++++++++++++++++++++++++++--------------
 1 file changed, 155 insertions(+), 64 deletions(-)

diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 6fef4614f9..ccf6e34df8 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -483,71 +483,15 @@ impl ObjectStore for LocalFileSystem {
     }
 
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
-        let config = Arc::clone(&self.config);
-
-        let root_path = match prefix {
-            Some(prefix) => match config.prefix_to_filesystem(prefix) {
-                Ok(path) => path,
-                Err(e) => return 
futures::future::ready(Err(e)).into_stream().boxed(),
-            },
-            None => self.config.root.to_file_path().unwrap(),
-        };
-
-        let walkdir = WalkDir::new(root_path)
-            // Don't include the root directory itself
-            .min_depth(1)
-            .follow_links(true);
-
-        let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
-            let entry = match 
convert_walkdir_result(result_dir_entry).transpose()? {
-                Ok(entry) => entry,
-                Err(e) => return Some(Err(e)),
-            };
-
-            if !entry.path().is_file() {
-                return None;
-            }
-
-            match config.filesystem_to_path(entry.path()) {
-                Ok(path) => match is_valid_file_path(&path) {
-                    true => convert_entry(entry, path).transpose(),
-                    false => None,
-                },
-                Err(e) => Some(Err(e)),
-            }
-        });
-
-        // If no tokio context, return iterator directly as no
-        // need to perform chunked spawn_blocking reads
-        if tokio::runtime::Handle::try_current().is_err() {
-            return futures::stream::iter(s).boxed();
-        }
-
-        // Otherwise list in batches of CHUNK_SIZE
-        const CHUNK_SIZE: usize = 1024;
-
-        let buffer = VecDeque::with_capacity(CHUNK_SIZE);
-        futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async 
move {
-            if buffer.is_empty() {
-                (s, buffer) = tokio::task::spawn_blocking(move || {
-                    for _ in 0..CHUNK_SIZE {
-                        match s.next() {
-                            Some(r) => buffer.push_back(r),
-                            None => break,
-                        }
-                    }
-                    (s, buffer)
-                })
-                .await?;
-            }
+        self.list_with_maybe_offset(prefix, None)
+    }
 
-            match buffer.pop_front() {
-                Some(Err(e)) => Err(e),
-                Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
-                None => Ok(None),
-            }
-        })
-        .boxed()
+    fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> BoxStream<'static, Result<ObjectMeta>> {
+        self.list_with_maybe_offset(prefix, Some(offset))
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
@@ -678,6 +622,93 @@ impl ObjectStore for LocalFileSystem {
     }
 }
 
+impl LocalFileSystem {
+    fn list_with_maybe_offset(
+        &self,
+        prefix: Option<&Path>,
+        maybe_offset: Option<&Path>,
+    ) -> BoxStream<'static, Result<ObjectMeta>> {
+        let config = Arc::clone(&self.config);
+
+        let root_path = match prefix {
+            Some(prefix) => match config.prefix_to_filesystem(prefix) {
+                Ok(path) => path,
+                Err(e) => return 
futures::future::ready(Err(e)).into_stream().boxed(),
+            },
+            None => config.root.to_file_path().unwrap(),
+        };
+
+        let walkdir = WalkDir::new(root_path)
+            // Don't include the root directory itself
+            .min_depth(1)
+            .follow_links(true);
+
+        let maybe_offset = maybe_offset.cloned();
+
+        let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
+            // Apply offset filter before proceeding, to reduce statx file 
system calls
+            // This matters for NFS mounts
+            if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(), 
result_dir_entry.as_ref()) {
+                let location = config.filesystem_to_path(entry.path());
+                match location {
+                    Ok(path) if path <= *offset => return None,
+                    Err(e) => return Some(Err(e)),
+                    _ => {}
+                }
+            }
+
+            let entry = match 
convert_walkdir_result(result_dir_entry).transpose()? {
+                Ok(entry) => entry,
+                Err(e) => return Some(Err(e)),
+            };
+
+            if !entry.path().is_file() {
+                return None;
+            }
+
+            match config.filesystem_to_path(entry.path()) {
+                Ok(path) => match is_valid_file_path(&path) {
+                    true => convert_entry(entry, path).transpose(),
+                    false => None,
+                },
+                Err(e) => Some(Err(e)),
+            }
+        });
+
+        // If no tokio context, return iterator directly as no
+        // need to perform chunked spawn_blocking reads
+        if tokio::runtime::Handle::try_current().is_err() {
+            return futures::stream::iter(s).boxed();
+        }
+
+        // Otherwise list in batches of CHUNK_SIZE
+        const CHUNK_SIZE: usize = 1024;
+
+        let buffer = VecDeque::with_capacity(CHUNK_SIZE);
+        futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async 
move {
+            if buffer.is_empty() {
+                (s, buffer) = tokio::task::spawn_blocking(move || {
+                    for _ in 0..CHUNK_SIZE {
+                        match s.next() {
+                            Some(r) => buffer.push_back(r),
+                            None => break,
+                        }
+                    }
+                    (s, buffer)
+                })
+                .await?;
+            }
+
+            match buffer.pop_front() {
+                Some(Err(e)) => Err(e),
+                Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
+                None => Ok(None),
+            }
+        })
+        .boxed()
+    }
+}
+
 /// Creates the parent directories of `path` or returns an error based on 
`source` if no parent
 fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> 
{
     let parent = path.parent().ok_or_else(|| {
@@ -1459,6 +1490,66 @@ mod tests {
         );
     }
 
+    #[tokio::test]
+    async fn test_path_with_offset() {
+        let root = TempDir::new().unwrap();
+        let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+        let root_path = root.path();
+        for i in 0..5 {
+            let filename = format!("test{}.parquet", i);
+            let file = root_path.join(filename);
+            std::fs::write(file, "test").unwrap();
+        }
+        let filter_str = "test";
+        let filter = String::from(filter_str);
+        let offset_str = filter + "1";
+        let offset = Path::from(offset_str.clone());
+
+        // Use list_with_offset to retrieve files
+        let res = integration.list_with_offset(None, &offset);
+        let offset_paths: Vec<_> = res.map_ok(|x| 
x.location).try_collect().await.unwrap();
+        let mut offset_files: Vec<_> = offset_paths
+            .iter()
+            .map(|x| String::from(x.filename().unwrap()))
+            .collect();
+
+        // Check result with direct filesystem read
+        let files = fs::read_dir(root_path).unwrap();
+        let filtered_files = files
+            .filter_map(Result::ok)
+            .filter_map(|d| {
+                d.file_name().to_str().and_then(|f| {
+                    if f.contains(filter_str) {
+                        Some(String::from(f))
+                    } else {
+                        None
+                    }
+                })
+            })
+            .collect::<Vec<_>>();
+
+        let mut expected_offset_files: Vec<_> = filtered_files
+            .iter()
+            .filter(|s| **s > offset_str)
+            .cloned()
+            .collect();
+
+        fn do_vecs_match<T: PartialEq>(a: &[T], b: &[T]) -> bool {
+            let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a == 
b).count();
+            matching == a.len() && matching == b.len()
+        }
+
+        offset_files.sort();
+        expected_offset_files.sort();
+
+        // println!("Expected Offset Files: {:?}", expected_offset_files);
+        // println!("Actual Offset Files: {:?}", offset_files);
+
+        assert_eq!(offset_files.len(), expected_offset_files.len());
+        assert!(do_vecs_match(&expected_offset_files, &offset_files));
+    }
+
     #[tokio::test]
     async fn filesystem_filename_with_percent() {
         let temp_dir = TempDir::new().unwrap();

Reply via email to