alamb commented on code in PR #7019:
URL: https://github.com/apache/arrow-rs/pull/7019#discussion_r1945230858
##########
object_store/src/local.rs:
##########
@@ -678,6 +622,93 @@ impl ObjectStore for LocalFileSystem {
}
}
+impl LocalFileSystem {
+ fn list_with_maybe_offset(
+ &self,
+ prefix: Option<&Path>,
+ maybe_offset: Option<&Path>,
+ ) -> BoxStream<'static, Result<ObjectMeta>> {
+ let config = Arc::clone(&self.config);
+
+ let root_path = match prefix {
+ Some(prefix) => match config.prefix_to_filesystem(prefix) {
+ Ok(path) => path,
+ Err(e) => return
futures::future::ready(Err(e)).into_stream().boxed(),
+ },
+ None => config.root.to_file_path().unwrap(),
+ };
+
+ let walkdir = WalkDir::new(root_path)
+ // Don't include the root directory itself
+ .min_depth(1)
+ .follow_links(true);
+
+ let maybe_offset = maybe_offset.cloned();
+
+ let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
+ // Apply offset filter before proceeding, to reduce statx file
system calls
+ // This matters for NFS mounts
+ if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(),
result_dir_entry.as_ref()) {
+ let location = config.filesystem_to_path(entry.path());
+ match location {
+ Ok(path) if path <= *offset => return None,
+ Err(e) => return Some(Err(e)),
+ _ => {}
+ }
+ }
+
+ let entry = match
convert_walkdir_result(result_dir_entry).transpose()? {
Review Comment:
My understanding is that the whole point of this PR is to avoid calling
`convert_walkdir_result` for files that will not be opened
##########
object_store/src/local.rs:
##########
@@ -1401,6 +1432,66 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn test_path_with_offset() {
+ let root = TempDir::new().unwrap();
+ let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+ let root_path = root.path();
+ for i in 0..5 {
+ let filename = format!("test{}.parquet", i);
+ let file = root_path.join(filename);
+ std::fs::write(file, "test").unwrap();
+ }
+ let filter_str = "test";
+ let filter = String::from(filter_str);
+ let offset_str = filter + "1";
+ let offset = Path::from(offset_str.clone());
+
+ // Use list_with_offset to retrieve files
+ let res = integration.list_with_offset(None, &offset);
+ let offset_paths: Vec<_> = res.map_ok(|x|
x.location).try_collect().await.unwrap();
+ let mut offset_files: Vec<_> = offset_paths
+ .iter()
+ .map(|x| String::from(x.filename().unwrap()))
+ .collect();
+
+ // Check result with direct filesystem read
Review Comment:
I think this depends on the file system returning the same order each time.
Again this is probably ok but it seems like it could result in a non
deterministic test failure over time
##########
object_store/src/local.rs:
##########
@@ -678,6 +622,93 @@ impl ObjectStore for LocalFileSystem {
}
}
+impl LocalFileSystem {
+ fn list_with_maybe_offset(
+ &self,
+ prefix: Option<&Path>,
+ maybe_offset: Option<&Path>,
+ ) -> BoxStream<'static, Result<ObjectMeta>> {
+ let config = Arc::clone(&self.config);
+
+ let root_path = match prefix {
+ Some(prefix) => match config.prefix_to_filesystem(prefix) {
+ Ok(path) => path,
+ Err(e) => return
futures::future::ready(Err(e)).into_stream().boxed(),
+ },
+ None => config.root.to_file_path().unwrap(),
+ };
+
+ let walkdir = WalkDir::new(root_path)
Review Comment:
I did some digging and I don't think WalkDir guarantees anything about the
sort order of the directory entries
thus if the OS returns a different order on the next call to
`list_with_offset` this may end up with a different result
However I think the existing default impl of `list_with_offset` (which just
calls list() and takes the offset) has the same problem so I don't think it
needs to be fixed in this PR
##########
object_store/src/local.rs:
##########
@@ -678,6 +622,93 @@ impl ObjectStore for LocalFileSystem {
}
}
+impl LocalFileSystem {
+ fn list_with_maybe_offset(
+ &self,
+ prefix: Option<&Path>,
+ maybe_offset: Option<&Path>,
+ ) -> BoxStream<'static, Result<ObjectMeta>> {
+ let config = Arc::clone(&self.config);
+
+ let root_path = match prefix {
+ Some(prefix) => match config.prefix_to_filesystem(prefix) {
+ Ok(path) => path,
+ Err(e) => return
futures::future::ready(Err(e)).into_stream().boxed(),
+ },
+ None => config.root.to_file_path().unwrap(),
+ };
+
+ let walkdir = WalkDir::new(root_path)
Review Comment:
It does look to me like you can explicitly set an order via
https://docs.rs/walkdir/latest/walkdir/struct.WalkDir.html#method.sort_by
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]