Copilot commented on code in PR #507:
URL: https://github.com/apache/hudi-rs/pull/507#discussion_r2659241275


##########
python/src/internal.rs:
##########
@@ -348,154 +355,106 @@ impl HudiTable {
         })
     }
 
-    #[pyo3(signature = (num_splits, filters=None))]
-    fn get_file_slices_splits(
-        &self,
-        num_splits: usize,
-        filters: Option<Vec<(String, String, String)>>,
-        py: Python,
-    ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
-        py.detach(|| {
-            let file_slices = rt()
-                .block_on(
-                    self.inner
-                        .get_file_slices_splits(num_splits, 
filters.unwrap_or_default()),
-                )
-                .map_err(PythonError::from)?;
-            Ok(file_slices
-                .iter()
-                .map(|inner_vec| 
inner_vec.iter().map(HudiFileSlice::from).collect())
-                .collect())
-        })
-    }
-
-    #[pyo3(signature = (num_splits, timestamp, filters=None))]
-    fn get_file_slices_splits_as_of(
-        &self,
-        num_splits: usize,
-        timestamp: &str,
-        filters: Option<Vec<(String, String, String)>>,
-        py: Python,
-    ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
-        py.detach(|| {
-            let file_slices = rt()
-                .block_on(self.inner.get_file_slices_splits_as_of(
-                    num_splits,
-                    timestamp,
-                    filters.unwrap_or_default(),
-                ))
-                .map_err(PythonError::from)?;
-            Ok(file_slices
-                .iter()
-                .map(|inner_vec| 
inner_vec.iter().map(HudiFileSlice::from).collect())
-                .collect())
-        })
-    }
-
-    #[pyo3(signature = (filters=None))]
+    /// Get file slices with optional partition filters and time-travel 
timestamp.
+    ///
+    /// Args:
+    ///     filters: Optional partition filter tuples in (field, op, value) 
format
+    ///     timestamp: Optional timestamp for time-travel queries
+    #[pyo3(signature = (filters=None, timestamp=None))]
     fn get_file_slices(
         &self,
         filters: Option<Vec<(String, String, String)>>,
+        timestamp: Option<&str>,
         py: Python,
     ) -> PyResult<Vec<HudiFileSlice>> {
         py.detach(|| {
+            let options = build_read_options(filters, 
timestamp).map_err(PythonError::from)?;
             let file_slices = rt()
-                
.block_on(self.inner.get_file_slices(filters.unwrap_or_default()))
+                .block_on(self.inner.get_file_slices(options))
                 .map_err(PythonError::from)?;
             Ok(file_slices.iter().map(HudiFileSlice::from).collect())
         })
     }
 
-    #[pyo3(signature = (timestamp, filters=None))]
-    fn get_file_slices_as_of(
-        &self,
-        timestamp: &str,
-        filters: Option<Vec<(String, String, String)>>,
-        py: Python,
-    ) -> PyResult<Vec<HudiFileSlice>> {
-        py.detach(|| {
-            let file_slices = rt()
-                .block_on(
-                    self.inner
-                        .get_file_slices_as_of(timestamp, 
filters.unwrap_or_default()),
-                )
-                .map_err(PythonError::from)?;
-            Ok(file_slices.iter().map(HudiFileSlice::from).collect())
-        })
-    }
-
-    #[pyo3(signature = (start_timestamp=None, end_timestamp=None))]
-    fn get_file_slices_between(
+    /// Get file slices for incremental reads between timestamps.
+    ///
+    /// Args:
+    ///     start_timestamp: Start timestamp (exclusive)
+    ///     end_timestamp: Optional end timestamp (inclusive), defaults to 
latest
+    #[pyo3(signature = (start_timestamp, end_timestamp=None))]
+    fn get_file_slices_incremental(
         &self,
-        start_timestamp: Option<&str>,
+        start_timestamp: &str,
         end_timestamp: Option<&str>,
         py: Python,
     ) -> PyResult<Vec<HudiFileSlice>> {
         py.detach(|| {
+            let mut options = 
ReadOptions::new().from_timestamp(start_timestamp);
+            if let Some(end_ts) = end_timestamp {
+                options = options.to_timestamp(end_ts);
+            }
             let file_slices = rt()
-                .block_on(
-                    self.inner
-                        .get_file_slices_between(start_timestamp, 
end_timestamp),
-                )
+                .block_on(self.inner.get_file_slices_incremental(options))
                 .map_err(PythonError::from)?;
             Ok(file_slices.iter().map(HudiFileSlice::from).collect())
         })
     }
 
-    #[pyo3(signature = (options=None))]
-    fn create_file_group_reader_with_options(
-        &self,
-        options: Option<HashMap<String, String>>,
-    ) -> PyResult<HudiFileGroupReader> {
+    /// Create a file group reader for this table.
+    fn create_file_group_reader(&self) -> PyResult<HudiFileGroupReader> {
+        let options = ReadOptions::new();
         let fg_reader = self
             .inner
-            .create_file_group_reader_with_options(options.unwrap_or_default())
+            .create_file_group_reader(&options)
             .map_err(PythonError::from)?;
         Ok(HudiFileGroupReader { inner: fg_reader })

Review Comment:
   The Python bindings create ReadOptions with `ReadOptions::new()` on lines 99 
and 405, but ignore all the options that could be passed from Python (like 
batch_size, projection, row predicates). This means Python users cannot control 
these aspects of file slice reading.
   
   Consider either:
   1. Adding parameters to expose these options to Python
   2. Documenting that these options are not yet supported in the Python API



##########
crates/core/src/table/mod.rs:
##########
@@ -619,8 +391,8 @@ impl Table {
         Ok(file_slices)
     }
 
-    /// Create a [FileGroupReader] using the [Table]'s Hudi configs, and 
overwriting options.
-    pub fn create_file_group_reader_with_options<I, K, V>(
+    /// Create a [FileGroupReader] using the [Table]'s Hudi configs.
+    pub(crate) fn create_file_group_reader_with_options<I, K, V>(

Review Comment:
   The visibility of `create_file_group_reader_with_options` has been changed 
from `pub` to `pub(crate)` on line 395. This is a breaking API change that 
could affect external crates that depend on this function. If this is 
intentional for the new design where users should use 
`create_file_group_reader` instead, consider documenting this breaking change 
or providing a deprecation period.
   ```suggestion
       pub fn create_file_group_reader_with_options<I, K, V>(
   ```



##########
crates/core/src/file_group/reader.rs:
##########
@@ -275,10 +282,18 @@ impl FileGroupReader {
             .into();
         let base_file_only = log_file_paths.is_empty() || use_read_optimized;
 
+        let projection = options.projection.as_deref();
+        let row_predicate = options.row_predicate.as_ref();
+
+        // For now, only projection is supported for base file only reads
+        // Log file merging with projection/predicate is more complex and 
deferred

Review Comment:
   The comment on line 288 states "For now, only projection is supported for 
base file only reads" but the code immediately below (lines 291-292) appears to 
pass both projection and row_predicate to `read_parquet_file_with_opts`. This 
suggests either:
   
   1. The comment is outdated and row predicates are actually supported
   2. The implementation is incomplete and row predicates are incorrectly 
passed but not properly handled
   
   Review and update either the comment or the implementation to ensure they 
match.
   ```suggestion
           // For base file only reads, apply projection and row predicates via 
read_parquet_file_with_opts.
           // For MOR tables with log files, projection and predicates are 
applied after merging log records.
   ```



##########
cpp/src/lib.rs:
##########
@@ -87,74 +84,41 @@ pub fn new_file_group_reader_with_options(
 }
 
 impl HudiFileGroupReader {
-    pub fn read_file_slice_by_base_file_path(
+    pub fn read_file_slice_by_paths(
         &self,
-        relative_path: &CxxString,
+        base_file_path: &CxxString,
+        log_file_paths: &CxxVector<CxxString>,
     ) -> *mut ffi::ArrowArrayStream {
-        let relative_path = relative_path
+        let base_file_path = base_file_path
             .to_str()
             .expect("Failed to convert CxxString to str: Invalid UTF-8 
sequence");
 
-        let record_batch = self
-            .inner
-            .read_file_slice_by_base_file_path_blocking(relative_path)
-            .expect("Failed to read file batch");
-        let schema = record_batch.schema();
-
-        create_raw_pointer_for_record_batches(vec![record_batch], schema)
-    }
-
-    pub fn read_file_slice(&self, file_slice: &HudiFileSlice) -> *mut 
ffi::ArrowArrayStream {
-        let record_batch = self
-            .inner
-            .read_file_slice_blocking(&file_slice.inner)
-            .expect("Failed to read file slice");
-        let schema = record_batch.schema();
+        let log_file_paths: Vec<String> = log_file_paths
+            .iter()
+            .map(|path| {
+                path.to_str()
+                    .expect("Failed to convert CxxString to str: Invalid UTF-8 
sequence")
+                    .to_string()
+            })
+            .collect();
+
+        let options = ReadOptions::new();
+        let stream = rt()
+            .block_on(
+                self.inner
+                    .read_file_slice_by_paths(base_file_path, log_file_paths, 
&options),
+            )
+            .expect("Failed to read file slice by paths");
+
+        let batches: Vec<_> = rt()
+            .block_on(stream.try_collect())
+            .expect("Failed to collect record batches");
+
+        if batches.is_empty() {
+            panic!("No record batches read from file slice");

Review Comment:
   In the C++ bindings, line 118 panics if no record batches are read. This is 
problematic because:
   
   1. Empty results are valid (e.g., empty table, all rows filtered out)
   2. Panicking in FFI code can cause undefined behavior
   3. The error should be propagated to C++ via the FFI boundary properly
   
   Instead of panicking, handle the empty case gracefully by either returning 
an empty stream or a proper error result that C++ can handle.
   ```suggestion
               // No record batches were read; return a null pointer so the 
caller can
               // treat this as an empty result or error as appropriate.
               return std::ptr::null_mut();
   ```



##########
crates/core/src/file_group/reader.rs:
##########
@@ -91,21 +93,54 @@ impl FileGroupReader {
         K: AsRef<str>,
         V: Into<String>,
     {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async {
-                let mut resolver = OptionResolver::new_with_options(base_uri, 
options);
-                resolver.resolve_options().await?;
-                let hudi_configs = 
Arc::new(HudiConfigs::new(resolver.hudi_options));
-                let storage =
-                    Storage::new(Arc::new(resolver.storage_options), 
hudi_configs.clone())?;
-
-                Ok(Self {
-                    hudi_configs,
-                    storage,
-                })
+        // Collect options upfront so we can move them across thread 
boundaries if needed
+        let options_vec: Vec<(String, String)> = options
+            .into_iter()
+            .map(|(k, v)| (k.as_ref().to_string(), v.into()))
+            .collect();
+        let base_uri = base_uri.to_string();
+
+        // Helper function to run the async resolution
+        async fn resolve_reader(
+            base_uri: String,
+            options_vec: Vec<(String, String)>,
+        ) -> Result<FileGroupReader> {
+            let mut resolver = OptionResolver::new_with_options(&base_uri, 
options_vec);
+            resolver.resolve_options().await?;
+            let hudi_configs = 
Arc::new(HudiConfigs::new(resolver.hudi_options));
+            let storage =
+                Storage::new(Arc::new(resolver.storage_options), 
hudi_configs.clone())?;
+
+            Ok(FileGroupReader {
+                hudi_configs,
+                storage,
             })
+        }
+
+        // Check if we're already in a tokio runtime
+        match tokio::runtime::Handle::try_current() {
+            Ok(_handle) => {
+                // We're inside a runtime, spawn a new thread with its own 
runtime
+                std::thread::scope(|s| {
+                    s.spawn(|| {
+                        tokio::runtime::Builder::new_current_thread()
+                            .enable_all()
+                            .build()
+                            .unwrap()
+                            .block_on(resolve_reader(base_uri, options_vec))
+                    })
+                    .join()
+                    .unwrap()
+                })
+            }
+            Err(_) => {
+                // No runtime, create one
+                tokio::runtime::Builder::new_current_thread()
+                    .enable_all()
+                    .build()?
+                    .block_on(resolve_reader(base_uri, options_vec))
+            }
+        }

Review Comment:
   The runtime creation and thread spawning logic in the `new` method is 
complex and could have concurrency issues. When already in a tokio runtime, 
spawning a new thread with `std::thread::scope` that creates another runtime 
may cause issues:
   
   1. The scoped thread pattern forces synchronous waiting, which defeats the 
purpose of async
   2. Creating a new runtime in a thread spawned from an existing runtime is 
unusual and may lead to unexpected behavior
   3. The `.unwrap()` calls on lines 129-130 and 133 can panic, which is 
undesirable in a library
   
   Consider simplifying this by either:
   - Requiring callers to use the async `new` method when already in a runtime
   - Using `tokio::task::spawn_blocking` instead of `std::thread::scope`
   - Documenting that this constructor should not be called from within an 
async context



##########
cpp/src/lib.rs:
##########
@@ -87,74 +84,41 @@ pub fn new_file_group_reader_with_options(
 }
 
 impl HudiFileGroupReader {
-    pub fn read_file_slice_by_base_file_path(
+    pub fn read_file_slice_by_paths(
         &self,
-        relative_path: &CxxString,
+        base_file_path: &CxxString,
+        log_file_paths: &CxxVector<CxxString>,
     ) -> *mut ffi::ArrowArrayStream {
-        let relative_path = relative_path
+        let base_file_path = base_file_path
             .to_str()
             .expect("Failed to convert CxxString to str: Invalid UTF-8 
sequence");
 
-        let record_batch = self
-            .inner
-            .read_file_slice_by_base_file_path_blocking(relative_path)
-            .expect("Failed to read file batch");
-        let schema = record_batch.schema();
-
-        create_raw_pointer_for_record_batches(vec![record_batch], schema)
-    }
-
-    pub fn read_file_slice(&self, file_slice: &HudiFileSlice) -> *mut 
ffi::ArrowArrayStream {
-        let record_batch = self
-            .inner
-            .read_file_slice_blocking(&file_slice.inner)
-            .expect("Failed to read file slice");
-        let schema = record_batch.schema();
+        let log_file_paths: Vec<String> = log_file_paths
+            .iter()
+            .map(|path| {
+                path.to_str()
+                    .expect("Failed to convert CxxString to str: Invalid UTF-8 
sequence")
+                    .to_string()
+            })
+            .collect();
+
+        let options = ReadOptions::new();
+        let stream = rt()
+            .block_on(
+                self.inner
+                    .read_file_slice_by_paths(base_file_path, log_file_paths, 
&options),
+            )
+            .expect("Failed to read file slice by paths");

Review Comment:
   Multiple `.expect()` calls with panics on lines 93-94, 99-102, and 110-111 
can cause the process to abort. In FFI code, panics are particularly dangerous 
as they can corrupt the calling C++ program's state. These should be changed to 
return proper error results through the FFI boundary using Result types or 
error codes.



##########
crates/core/src/file_group/reader.rs:
##########
@@ -305,24 +320,333 @@ impl FileGroupReader {
             all_batches.extend(log_batches);
 
             let merger = RecordMerger::new(schema.clone(), 
self.hudi_configs.clone());
-            merger.merge_record_batches(all_batches)
+            let batch = merger.merge_record_batches(all_batches)?;
+
+            // Apply projection if specified
+            let batch = if let Some(cols) = projection {
+                Self::apply_projection(&batch, cols)?
+            } else {
+                batch
+            };
+
+            // Apply row predicate if specified
+            let batch = if let Some(predicate) = row_predicate {
+                Self::apply_row_predicate(&batch, predicate)?
+            } else {
+                batch
+            };
+
+            Ok(batch)
         }
     }
 
-    /// Same as [FileGroupReader::read_file_slice_from_paths], but blocking.
-    pub fn read_file_slice_from_paths_blocking<I, S>(
+    /// Reads a file slice as a stream of record batches with [ReadOptions].
+    ///
+    /// This is the primary read API for FileGroupReader. It returns a stream 
that
+    /// yields record batches as they are read.
+    ///
+    /// For COW tables or read-optimized mode (base file only), this returns a 
true
+    /// streaming iterator from the underlying parquet file, yielding batches 
as they
+    /// are read without loading all data into memory.
+    ///
+    /// For MOR tables with log files, this falls back to the 
collect-and-merge approach
+    /// and yields the merged result as a single batch.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation:
+    ///     - `projection`: Column names to project (select)
+    ///     - `row_predicate`: Row-level filter predicate
+    ///     - `batch_size`: Target rows per batch
+    ///
+    /// # Returns
+    /// A stream of record batches. The stream owns all necessary data and is 
`'static`.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = reader.read_file_slice(&file_slice, &options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    /// ```
+    pub async fn read_file_slice(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
+
+        let use_read_optimized: bool = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+            .into();
+        let base_file_only = log_file_paths.is_empty() || use_read_optimized;
+
+        if base_file_only {
+            // True streaming: return the parquet stream directly
+            self.read_parquet_file_stream(&base_file_path, options).await
+        } else {
+            // Fallback: collect + merge, then yield as single-item stream
+            let batch = self
+                .read_file_slice_from_paths_with_opts(&base_file_path, 
log_file_paths, options)
+                .await?;
+            Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+        }

Review Comment:
   The streaming implementation for MOR tables with log files (lines 403-408) 
falls back to collecting all data into memory first via 
`read_file_slice_from_paths_with_opts`, then yields it as a single batch. This 
defeats the purpose of streaming for MOR tables and could cause memory issues 
with large datasets.
   
   Consider documenting this limitation clearly in the function documentation, 
or better yet, implementing true streaming for MOR tables in a future iteration.



##########
crates/core/tests/table_read_tests.rs:
##########
@@ -35,21 +37,23 @@ mod v6_tables {
     mod snapshot_queries {
         use super::*;
 
-        #[test]
-        fn test_empty_table() -> Result<()> {
+        #[tokio::test]
+        async fn test_empty_table() -> Result<()> {
             for base_url in SampleTable::V6Empty.urls() {
-                let hudi_table = Table::new_blocking(base_url.path())?;
-                let records = 
hudi_table.read_snapshot_blocking(empty_filters())?;
+                let hudi_table = Table::new(base_url.path()).await?;
+                let stream = 
hudi_table.read_snapshot(ReadOptions::new()).await?;
+                let records: Vec<_> = stream.try_collect().await?;
                 assert!(records.is_empty());
             }
             Ok(())
         }

Review Comment:
   The test at line 41 is named `test_empty_table` but there are multiple tests 
with this same name in the file (lines 41, 312, 418). While Rust allows this in 
different modules, it makes it harder to identify which specific test failed 
when running tests. Consider using more descriptive names like 
`test_v6_empty_table_snapshot`, `test_v6_empty_table_incremental`, 
`test_v8_empty_table_snapshot` etc.



##########
crates/core/src/table/mod.rs:
##########
@@ -642,153 +414,198 @@ impl Table {
         )
     }
 
-    /// Get all the latest records in the table.
+    /// Create a [FileGroupReader] for this table.
     ///
     /// # Arguments
-    ///     * `filters` - Partition filters to apply.
-    pub async fn read_snapshot<I, S>(&self, filters: I) -> 
Result<Vec<RecordBatch>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() {
-            let filters = from_str_tuples(filters)?;
-            self.read_snapshot_internal(timestamp, &filters).await
-        } else {
-            Ok(Vec::new())
+    /// * `options` - Read options for configuring the reader
+    pub fn create_file_group_reader(&self, options: &ReadOptions) -> 
Result<FileGroupReader> {
+        let mut read_configs: Vec<(HudiReadConfig, String)> = vec![];
+
+        if let Some(ref ts) = options.as_of_timestamp {
+            let formatted = format_timestamp(ts, &self.timezone())?;
+            read_configs.push((HudiReadConfig::FileGroupEndTimestamp, 
formatted));
+        } else if let Some(ts) = 
self.timeline.get_latest_commit_timestamp_as_option() {
+            read_configs.push((HudiReadConfig::FileGroupEndTimestamp, 
ts.to_string()));
         }
-    }
 
-    /// Same as [Table::read_snapshot], but blocking.
-    pub fn read_snapshot_blocking<I, S>(&self, filters: I) -> 
Result<Vec<RecordBatch>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.read_snapshot(filters).await })
+        if let Some(ref ts) = options.start_timestamp {
+            let formatted = format_timestamp(ts, &self.timezone())?;
+            read_configs.push((HudiReadConfig::FileGroupStartTimestamp, 
formatted));
+        }
+
+        if let Some(batch_size) = options.batch_size {
+            read_configs.push((HudiReadConfig::StreamBatchSize, 
batch_size.to_string()));
+        }
+
+        self.create_file_group_reader_with_options(read_configs)
     }
 
-    /// Get all the records in the table at a given timestamp.
+    /// Read records from the table as a stream using [ReadOptions].
+    ///
+    /// This is the primary read API. It returns a stream that yields record 
batches
+    /// as they are read, enabling memory-efficient processing of large tables.
+    ///
+    /// For COW tables or read-optimized mode, each parquet file is streamed 
batch-by-batch.
+    /// For MOR tables with log files, each file slice is read, merged, and 
yielded as a batch.
     ///
     /// # Arguments
-    ///     * `timestamp` - The timestamp which records associated with.
-    ///     * `filters` - Partition filters to apply.
-    pub async fn read_snapshot_as_of<I, S>(
+    ///     * `options` - Read options for configuring the read operation:
+    ///         - `as_of_timestamp`: If set, reads data as of this timestamp 
(time travel)
+    ///         - `partition_filter`: Filters for partition pruning
+    ///         - `projection`: Column names to project (select)
+    ///         - `row_predicate`: Row-level filter predicate
+    ///         - `batch_size`: Target rows per batch (effective for 
COW/read-optimized)
+    ///
+    /// # Example
+    ///
+    /// ```ignore
+    /// use hudi_core::table::{Table, ReadOptions};
+    /// use hudi_core::expr::filter::col;
+    /// use futures::StreamExt;
+    ///
+    /// // Read latest snapshot
+    /// let mut stream = table.read_snapshot(ReadOptions::new()).await?;
+    ///
+    /// // Read with partition filter and column projection
+    /// let options = ReadOptions::new()
+    ///     .with_partition_filter(col("date").eq("2024-01-01"))
+    ///     .with_column(["id", "name", "value"])
+    ///     .with_batch_size(4096);
+    ///
+    /// let mut stream = table.read_snapshot(options).await?;
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    ///
+    /// // Time travel: read snapshot as of a specific timestamp
+    /// let options = ReadOptions::new().as_of("2024-01-01T12:00:00Z");
+    /// let mut stream = table.read_snapshot(options).await?;
+    /// ```
+    pub async fn read_snapshot(
         &self,
-        timestamp: &str,
-        filters: I,
-    ) -> Result<Vec<RecordBatch>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        let timestamp = format_timestamp(timestamp, &self.timezone())?;
-        let filters = from_str_tuples(filters)?;
-        self.read_snapshot_internal(&timestamp, &filters).await
-    }
+        options: ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        // Determine the timestamp: use as_of_timestamp from options, or latest
+        let timestamp = if let Some(ref ts) = options.as_of_timestamp {
+            format_timestamp(ts, &self.timezone())?
+        } else if let Some(ts) = 
self.timeline.get_latest_commit_timestamp_as_option() {
+            ts.to_string()
+        } else {
+            return Ok(Box::pin(futures::stream::empty()));
+        };
 
-    /// Same as [Table::read_snapshot_as_of], but blocking.
-    pub fn read_snapshot_as_of_blocking<I, S>(
-        &self,
-        timestamp: &str,
-        filters: I,
-    ) -> Result<Vec<RecordBatch>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.read_snapshot_as_of(timestamp, 
filters).await })
+        self.read_snapshot_internal(&timestamp, options).await
     }
 
     async fn read_snapshot_internal(
         &self,
         timestamp: &str,
-        filters: &[Filter],
-    ) -> Result<Vec<RecordBatch>> {
-        let file_slices = self.get_file_slices_internal(timestamp, 
filters).await?;
-        let fg_reader = self.create_file_group_reader_with_options([(
-            HudiReadConfig::FileGroupEndTimestamp,
-            timestamp,
-        )])?;
-        let batches =
-            futures::future::try_join_all(file_slices.iter().map(|f| 
fg_reader.read_file_slice(f)))
-                .await?;
-        Ok(batches)
+        options: ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let file_slices = self
+            .get_file_slices_internal(timestamp, &options.partition_filter)
+            .await?;
+
+        // Build the base read config options
+        let mut read_configs: Vec<(HudiReadConfig, String)> =
+            vec![(HudiReadConfig::FileGroupEndTimestamp, 
timestamp.to_string())];
+
+        // Add batch size if specified
+        if let Some(batch_size) = options.batch_size {
+            read_configs.push((HudiReadConfig::StreamBatchSize, 
batch_size.to_string()));
+        }
+
+        let fg_reader = 
self.create_file_group_reader_with_options(read_configs)?;
+
+        // Create a stream that yields batches from all file slices
+        // Each file slice produces a 'static stream, which we flatten
+        let stream = futures::stream::iter(file_slices)
+            .then(move |file_slice| {
+                let fg_reader = fg_reader.clone();
+                let options = options.clone();
+                async move {
+                    fg_reader.read_file_slice(&file_slice, &options).await
+                }
+            })
+            .try_flatten();
+
+        Ok(Box::pin(stream))
     }

Review Comment:
   The streaming read APIs return `BoxStream<'static, Result<RecordBatch>>`, 
but for truly large datasets, users may want to apply backpressure or limit 
memory usage. Consider documenting best practices for:
   
   1. How to limit concurrent processing of batches
   2. How to apply backpressure when consumers are slower than producers
   3. Memory implications of the 'static lifetime
   
   Additionally, consider whether a bounded buffer or backpressure mechanism 
should be built into the stream.



##########
crates/core/src/file_group/reader.rs:
##########
@@ -305,24 +320,333 @@ impl FileGroupReader {
             all_batches.extend(log_batches);
 
             let merger = RecordMerger::new(schema.clone(), 
self.hudi_configs.clone());
-            merger.merge_record_batches(all_batches)
+            let batch = merger.merge_record_batches(all_batches)?;
+
+            // Apply projection if specified
+            let batch = if let Some(cols) = projection {
+                Self::apply_projection(&batch, cols)?
+            } else {
+                batch
+            };
+
+            // Apply row predicate if specified
+            let batch = if let Some(predicate) = row_predicate {
+                Self::apply_row_predicate(&batch, predicate)?
+            } else {
+                batch
+            };
+
+            Ok(batch)
         }
     }
 
-    /// Same as [FileGroupReader::read_file_slice_from_paths], but blocking.
-    pub fn read_file_slice_from_paths_blocking<I, S>(
+    /// Reads a file slice as a stream of record batches with [ReadOptions].
+    ///
+    /// This is the primary read API for FileGroupReader. It returns a stream 
that
+    /// yields record batches as they are read.
+    ///
+    /// For COW tables or read-optimized mode (base file only), this returns a 
true
+    /// streaming iterator from the underlying parquet file, yielding batches 
as they
+    /// are read without loading all data into memory.
+    ///
+    /// For MOR tables with log files, this falls back to the 
collect-and-merge approach
+    /// and yields the merged result as a single batch.
+    ///
+    /// # Arguments
+    /// * `file_slice` - The file slice to read.
+    /// * `options` - Read options for configuring the read operation:
+    ///     - `projection`: Column names to project (select)
+    ///     - `row_predicate`: Row-level filter predicate
+    ///     - `batch_size`: Target rows per batch
+    ///
+    /// # Returns
+    /// A stream of record batches. The stream owns all necessary data and is 
`'static`.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    ///
+    /// let options = ReadOptions::new().with_batch_size(4096);
+    /// let mut stream = reader.read_file_slice(&file_slice, &options).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    /// ```
+    pub async fn read_file_slice(
+        &self,
+        file_slice: &FileSlice,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
+
+        let use_read_optimized: bool = self
+            .hudi_configs
+            .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+            .into();
+        let base_file_only = log_file_paths.is_empty() || use_read_optimized;
+
+        if base_file_only {
+            // True streaming: return the parquet stream directly
+            self.read_parquet_file_stream(&base_file_path, options).await
+        } else {
+            // Fallback: collect + merge, then yield as single-item stream
+            let batch = self
+                .read_file_slice_from_paths_with_opts(&base_file_path, 
log_file_paths, options)
+                .await?;
+            Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+        }
+    }
+
+    /// Reads a file slice specified by file paths as a stream of record 
batches.
+    ///
+    /// This is a convenience method that constructs a [FileSlice] from the 
given paths
+    /// and delegates to [Self::read_file_slice].
+    ///
+    /// # Arguments
+    /// * `base_file_path` - Relative path to the base file (e.g., 
"city=chennai/file.parquet")
+    /// * `log_file_paths` - Optional iterator of relative paths to log files
+    /// * `options` - Read options for configuring the read operation
+    ///
+    /// # Returns
+    /// A stream of record batches.
+    ///
+    /// # Example
+    /// ```ignore
+    /// use futures::StreamExt;
+    ///
+    /// let options = ReadOptions::new();
+    /// let mut stream = reader.read_file_slice_by_paths(
+    ///     "city=chennai/file-id-0_0-7-24_20240101120000.parquet",
+    ///     [".file-id-0_20240101120000.log.1_0-51-115"],
+    ///     &options,
+    /// ).await?;
+    ///
+    /// while let Some(result) = stream.next().await {
+    ///     let batch = result?;
+    ///     // Process batch...
+    /// }
+    /// ```
+    pub async fn read_file_slice_by_paths<I, S>(
         &self,
         base_file_path: &str,
         log_file_paths: I,
-    ) -> Result<RecordBatch>
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>>
     where
         I: IntoIterator<Item = S>,
         S: AsRef<str>,
     {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(self.read_file_slice_from_paths(base_file_path, 
log_file_paths))
+        use std::path::Path;
+
+        // Parse base file path to extract partition path and file name
+        let path = Path::new(base_file_path);
+        let partition_path = path
+            .parent()
+            .map(|p| p.to_string_lossy().to_string())
+            .unwrap_or_default();
+        let base_file_name = path
+            .file_name()
+            .ok_or_else(|| {
+                CoreError::FileGroup(format!(
+                    "Cannot extract file name from base file path: 
{base_file_path}"
+                ))
+            })?
+            .to_string_lossy();
+
+        // Create a FileGroup from the base file
+        let mut file_group = 
FileGroup::new_with_base_file_name(&base_file_name, &partition_path)?;
+
+        // Add log files if provided
+        for log_file_path in log_file_paths {
+            let log_path = Path::new(log_file_path.as_ref());
+            let log_file_name = log_path
+                .file_name()
+                .ok_or_else(|| {
+                    CoreError::FileGroup(format!(
+                        "Cannot extract file name from log file path: {}",
+                        log_file_path.as_ref()
+                    ))
+                })?
+                .to_string_lossy();
+            file_group.add_log_file_from_name(&log_file_name)?;
+        }
+
+        // Get the file slice (there should be exactly one after adding base 
file)
+        let file_slice = file_group
+            .file_slices
+            .values()
+            .next()
+            .ok_or_else(|| CoreError::FileGroup("No file slice found in file 
group".to_string()))?
+            .clone();
+
+        // Delegate to the primary read API
+        self.read_file_slice(&file_slice, options).await
+    }
+
+    /// Reads a parquet file as a stream of record batches.
+    ///
+    /// This method propagates the underlying parquet stream, applying 
Hudi-specific
+    /// filtering (commit time filtering for incremental reads) to each batch.
+    /// The returned stream owns all necessary data and is `'static`.
+    async fn read_parquet_file_stream(
+        &self,
+        relative_path: &str,
+        options: &ReadOptions,
+    ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+        let batch_size = options.batch_size.unwrap_or(8192);
+        let parquet_options = ParquetReadOptions::with_batch_size(batch_size);
+
+        let stream = self
+            .storage
+            .get_parquet_file_stream(relative_path, parquet_options)
+            .await
+            .map_err(|e| ReadFileSliceError(format!("Failed to read path 
{relative_path}: {e:?}")))?;
+
+        // Clone data for the closure so the stream is 'static
+        let reader = self.clone();
+        let projection = options.projection.clone();
+        let row_predicate = options.row_predicate.clone();
+
+        let mapped_stream = stream.map(move |result| {
+            let batch = result
+                .map_err(|e| ReadFileSliceError(format!("Failed to read batch: 
{e:?}")))?;
+
+            // Apply Hudi commit time filtering
+            let batch = if let Some(mask) = 
reader.create_filtering_mask_for_base_file_records(&batch)? {
+                filter_record_batch(&batch, &mask)
+                    .map_err(|e| ReadFileSliceError(format!("Failed to filter 
records: {e:?}")))?
+            } else {
+                batch
+            };
+
+            // Apply column projection if specified
+            let batch = if let Some(ref cols) = projection {
+                FileGroupReader::apply_projection(&batch, cols)?
+            } else {
+                batch
+            };
+
+            // Apply row predicate if specified
+            let batch = if let Some(ref predicate) = row_predicate {
+                FileGroupReader::apply_row_predicate(&batch, predicate)?
+            } else {
+                batch
+            };
+
+            Ok(batch)
+        });
+
+        Ok(Box::pin(mapped_stream))
+    }
+
+    /// Creates an instant range for log file scanning using ReadOptions.
+    fn create_instant_range_for_log_file_scan_with_opts(&self, options: 
&ReadOptions) -> InstantRange {
+        let timezone = self
+            .hudi_configs
+            .get_or_default(HudiTableConfig::TimelineTimezone)
+            .into();
+
+        // Use as_of_timestamp from options if provided, otherwise fall back 
to config
+        let start_timestamp = self
+            .hudi_configs
+            .try_get(HudiReadConfig::FileGroupStartTimestamp)
+            .map(|v| -> String { v.into() });
+
+        let end_timestamp = options
+            .as_of_timestamp
+            .clone()
+            .or_else(|| {
+                self.hudi_configs
+                    .try_get(HudiReadConfig::FileGroupEndTimestamp)
+                    .map(|v| -> String { v.into() })
+            });
+
+        InstantRange::new(timezone, start_timestamp, end_timestamp, false, 
true)
+    }
+
+    /// Applies column projection to a record batch.
+    fn apply_projection(batch: &RecordBatch, columns: &[String]) -> 
Result<RecordBatch> {
+        let schema = batch.schema();
+        let indices: Vec<usize> = columns
+            .iter()
+            .filter_map(|col| schema.index_of(col).ok())
+            .collect();
+
+        if indices.is_empty() {
+            return Err(ReadFileSliceError(
+                "No matching columns found for projection".to_string(),
+            ));
+        }
+
+        batch
+            .project(&indices)
+            .map_err(|e| ReadFileSliceError(format!("Failed to project 
columns: {e:?}")))
+    }
+
+    /// Applies a row predicate filter to a record batch.
+    fn apply_row_predicate(batch: &RecordBatch, predicate: &Filter) -> 
Result<RecordBatch> {
+        let schema = batch.schema();
+        let schemable = SchemableFilter::try_from((predicate.clone(), 
schema.as_ref()))?;
+
+        // Get the column to filter on
+        let column = batch.column_by_name(&predicate.field_name).ok_or_else(|| 
{
+            ReadFileSliceError(format!("Column '{}' not found for predicate", 
predicate.field_name))
+        })?;
+
+        // Apply the comparison to get a boolean mask
+        let mask = schemable.apply_comparsion(column)?;
+
+        // Filter the batch
+        filter_record_batch(batch, &mask)
+            .map_err(|e| ReadFileSliceError(format!("Failed to apply row 
predicate: {e:?}")))
+    }

Review Comment:
   The `apply_row_predicate` method only supports single-field predicates. Line 
603 gets a single column by `predicate.field_name`, but the Filter type might 
support more complex predicates with multiple fields or nested conditions. If a 
compound filter is passed, this will fail or only partially apply the filter.
   
   Consider adding validation to ensure only simple single-field predicates are 
supported, or extend the implementation to handle compound filters properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to