xudong963 commented on code in PR #15432:
URL: https://github.com/apache/datafusion/pull/15432#discussion_r2016725230
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1181,6 +1175,92 @@ impl ListingTable {
}
}
+/// Processes a stream of partitioned files and returns a `FileGroup`
containing the files.
+///
+/// This function collects files from the provided stream until either:
+/// 1. The stream is exhausted
+/// 2. The accumulated number of rows exceeds the provided `limit` (if
specified)
+///
+/// # Arguments
+/// * `files` - A stream of `Result<PartitionedFile>` items to process
+/// * `limit` - An optional row count limit. If provided, the function will
stop collecting files
+/// once the accumulated number of rows exceeds this limit
+/// * `collect_stats` - Whether to collect and accumulate statistics from the
files
+///
+/// # Returns
+/// A `Result` containing a `FileGroup` with the collected files
+/// and a boolean indicating whether the statistics are inexact.
+///
+/// # Note
+/// The function will continue processing files if statistics are not
available or if the
+/// limit is not provided. If `collect_stats` is false, statistics won't be
accumulated
+/// but files will still be collected.
+async fn get_files_with_limit(
+ files: impl Stream<Item = Result<PartitionedFile>>,
+ limit: Option<usize>,
+ collect_stats: bool,
+) -> Result<(FileGroup, bool)> {
+ let mut file_group = FileGroup::default();
+ // Fusing the stream allows us to call next safely even once it is
finished.
+ let mut all_files = Box::pin(files.fuse());
+ let mut num_rows = Precision::<usize>::Absent;
+ while let Some(first_file) = all_files.next().await {
Review Comment:
I agree, the current nested loop is annoying.
Code to my mind by using FSM is like this:
```rust
enum ProcessingState {
ReadingFiles,
ReachedLimit,
}
let mut state = ProcessingState::ReadingFiles;
let mut num_rows = Precision::<usize>::Absent;
while let Some(file_result) = all_files.next().await {
// Early exit if we've already reached our limit
if matches!(state, ProcessingState::ReachedLimit) {
break;
}
let file = file_result?;
// Update file statistics regardless of state
if collect_stats {
if let Some(file_stats) = &file.statistics {
num_rows = if file_group.is_empty() {
// For the first file, just take its row count
file_stats.num_rows
} else {
// For subsequent files, accumulate the counts
crate::datasource::statistics::add_row_stats(
num_rows,
file_stats.num_rows,
)
};
}
}
// Always add the file to our group
file_group.push(file);
// Check if we've hit the limit (if one was specified)
if let Some(limit) = limit {
if let Precision::Exact(row_count) = num_rows {
if row_count > limit {
state = ProcessingState::ReachedLimit;
}
}
}
}
```
Given that the PR mainly move the old cold to new method to make them
flexible. I don't want to make more changes in the PR, but happy to do such
refactor in the separated PR.
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]