Copilot commented on code in PR #507:
URL: https://github.com/apache/hudi-rs/pull/507#discussion_r2659241275
##########
python/src/internal.rs:
##########
@@ -348,154 +355,106 @@ impl HudiTable {
})
}
- #[pyo3(signature = (num_splits, filters=None))]
- fn get_file_slices_splits(
- &self,
- num_splits: usize,
- filters: Option<Vec<(String, String, String)>>,
- py: Python,
- ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
- py.detach(|| {
- let file_slices = rt()
- .block_on(
- self.inner
- .get_file_slices_splits(num_splits,
filters.unwrap_or_default()),
- )
- .map_err(PythonError::from)?;
- Ok(file_slices
- .iter()
- .map(|inner_vec|
inner_vec.iter().map(HudiFileSlice::from).collect())
- .collect())
- })
- }
-
- #[pyo3(signature = (num_splits, timestamp, filters=None))]
- fn get_file_slices_splits_as_of(
- &self,
- num_splits: usize,
- timestamp: &str,
- filters: Option<Vec<(String, String, String)>>,
- py: Python,
- ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
- py.detach(|| {
- let file_slices = rt()
- .block_on(self.inner.get_file_slices_splits_as_of(
- num_splits,
- timestamp,
- filters.unwrap_or_default(),
- ))
- .map_err(PythonError::from)?;
- Ok(file_slices
- .iter()
- .map(|inner_vec|
inner_vec.iter().map(HudiFileSlice::from).collect())
- .collect())
- })
- }
-
- #[pyo3(signature = (filters=None))]
+ /// Get file slices with optional partition filters and time-travel
timestamp.
+ ///
+ /// Args:
+ /// filters: Optional partition filter tuples in (field, op, value)
format
+ /// timestamp: Optional timestamp for time-travel queries
+ #[pyo3(signature = (filters=None, timestamp=None))]
fn get_file_slices(
&self,
filters: Option<Vec<(String, String, String)>>,
+ timestamp: Option<&str>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
py.detach(|| {
+ let options = build_read_options(filters,
timestamp).map_err(PythonError::from)?;
let file_slices = rt()
-
.block_on(self.inner.get_file_slices(filters.unwrap_or_default()))
+ .block_on(self.inner.get_file_slices(options))
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(HudiFileSlice::from).collect())
})
}
- #[pyo3(signature = (timestamp, filters=None))]
- fn get_file_slices_as_of(
- &self,
- timestamp: &str,
- filters: Option<Vec<(String, String, String)>>,
- py: Python,
- ) -> PyResult<Vec<HudiFileSlice>> {
- py.detach(|| {
- let file_slices = rt()
- .block_on(
- self.inner
- .get_file_slices_as_of(timestamp,
filters.unwrap_or_default()),
- )
- .map_err(PythonError::from)?;
- Ok(file_slices.iter().map(HudiFileSlice::from).collect())
- })
- }
-
- #[pyo3(signature = (start_timestamp=None, end_timestamp=None))]
- fn get_file_slices_between(
+ /// Get file slices for incremental reads between timestamps.
+ ///
+ /// Args:
+ /// start_timestamp: Start timestamp (exclusive)
+ /// end_timestamp: Optional end timestamp (inclusive), defaults to
latest
+ #[pyo3(signature = (start_timestamp, end_timestamp=None))]
+ fn get_file_slices_incremental(
&self,
- start_timestamp: Option<&str>,
+ start_timestamp: &str,
end_timestamp: Option<&str>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
py.detach(|| {
+ let mut options =
ReadOptions::new().from_timestamp(start_timestamp);
+ if let Some(end_ts) = end_timestamp {
+ options = options.to_timestamp(end_ts);
+ }
let file_slices = rt()
- .block_on(
- self.inner
- .get_file_slices_between(start_timestamp,
end_timestamp),
- )
+ .block_on(self.inner.get_file_slices_incremental(options))
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(HudiFileSlice::from).collect())
})
}
- #[pyo3(signature = (options=None))]
- fn create_file_group_reader_with_options(
- &self,
- options: Option<HashMap<String, String>>,
- ) -> PyResult<HudiFileGroupReader> {
+ /// Create a file group reader for this table.
+ fn create_file_group_reader(&self) -> PyResult<HudiFileGroupReader> {
+ let options = ReadOptions::new();
let fg_reader = self
.inner
- .create_file_group_reader_with_options(options.unwrap_or_default())
+ .create_file_group_reader(&options)
.map_err(PythonError::from)?;
Ok(HudiFileGroupReader { inner: fg_reader })
Review Comment:
The Python bindings create ReadOptions with `ReadOptions::new()` on lines 99
and 405, but ignore all the options that could be passed from Python (like
batch_size, projection, row predicates). This means Python users cannot control
these aspects of file slice reading.
Consider either:
1. Adding parameters to expose these options to Python
2. Documenting that these options are not yet supported in the Python API
##########
crates/core/src/table/mod.rs:
##########
@@ -619,8 +391,8 @@ impl Table {
Ok(file_slices)
}
- /// Create a [FileGroupReader] using the [Table]'s Hudi configs, and
overwriting options.
- pub fn create_file_group_reader_with_options<I, K, V>(
+ /// Create a [FileGroupReader] using the [Table]'s Hudi configs.
+ pub(crate) fn create_file_group_reader_with_options<I, K, V>(
Review Comment:
The visibility of `create_file_group_reader_with_options` has been changed
from `pub` to `pub(crate)` on line 395. This is a breaking API change that
could affect external crates that depend on this function. If this is
intentional for the new design where users should use
`create_file_group_reader` instead, consider documenting this breaking change
or providing a deprecation period.
```suggestion
pub fn create_file_group_reader_with_options<I, K, V>(
```
##########
crates/core/src/file_group/reader.rs:
##########
@@ -275,10 +282,18 @@ impl FileGroupReader {
.into();
let base_file_only = log_file_paths.is_empty() || use_read_optimized;
+ let projection = options.projection.as_deref();
+ let row_predicate = options.row_predicate.as_ref();
+
+ // For now, only projection is supported for base file only reads
+ // Log file merging with projection/predicate is more complex and
deferred
Review Comment:
The comment on line 288 states "For now, only projection is supported for
base file only reads" but the code immediately below (lines 291-292) appears to
pass both projection and row_predicate to `read_parquet_file_with_opts`. This
suggests either:
1. The comment is outdated and row predicates are actually supported
2. The implementation is incomplete and row predicates are incorrectly
passed but not properly handled
Review and update either the comment or the implementation to ensure they
match.
```suggestion
// For base file only reads, apply projection and row predicates via
read_parquet_file_with_opts.
// For MOR tables with log files, projection and predicates are
applied after merging log records.
```
##########
cpp/src/lib.rs:
##########
@@ -87,74 +84,41 @@ pub fn new_file_group_reader_with_options(
}
impl HudiFileGroupReader {
- pub fn read_file_slice_by_base_file_path(
+ pub fn read_file_slice_by_paths(
&self,
- relative_path: &CxxString,
+ base_file_path: &CxxString,
+ log_file_paths: &CxxVector<CxxString>,
) -> *mut ffi::ArrowArrayStream {
- let relative_path = relative_path
+ let base_file_path = base_file_path
.to_str()
.expect("Failed to convert CxxString to str: Invalid UTF-8
sequence");
- let record_batch = self
- .inner
- .read_file_slice_by_base_file_path_blocking(relative_path)
- .expect("Failed to read file batch");
- let schema = record_batch.schema();
-
- create_raw_pointer_for_record_batches(vec![record_batch], schema)
- }
-
- pub fn read_file_slice(&self, file_slice: &HudiFileSlice) -> *mut
ffi::ArrowArrayStream {
- let record_batch = self
- .inner
- .read_file_slice_blocking(&file_slice.inner)
- .expect("Failed to read file slice");
- let schema = record_batch.schema();
+ let log_file_paths: Vec<String> = log_file_paths
+ .iter()
+ .map(|path| {
+ path.to_str()
+ .expect("Failed to convert CxxString to str: Invalid UTF-8
sequence")
+ .to_string()
+ })
+ .collect();
+
+ let options = ReadOptions::new();
+ let stream = rt()
+ .block_on(
+ self.inner
+ .read_file_slice_by_paths(base_file_path, log_file_paths,
&options),
+ )
+ .expect("Failed to read file slice by paths");
+
+ let batches: Vec<_> = rt()
+ .block_on(stream.try_collect())
+ .expect("Failed to collect record batches");
+
+ if batches.is_empty() {
+ panic!("No record batches read from file slice");
Review Comment:
In the C++ bindings, line 118 panics if no record batches are read. This is
problematic because:
1. Empty results are valid (e.g., empty table, all rows filtered out)
2. Panicking in FFI code can cause undefined behavior
3. The error should be propagated to C++ via the FFI boundary properly
Instead of panicking, handle the empty case gracefully by either returning
an empty stream or a proper error result that C++ can handle.
```suggestion
// No record batches were read; return a null pointer so the
caller can
// treat this as an empty result or error as appropriate.
return std::ptr::null_mut();
```
##########
crates/core/src/file_group/reader.rs:
##########
@@ -91,21 +93,54 @@ impl FileGroupReader {
K: AsRef<str>,
V: Into<String>,
{
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async {
- let mut resolver = OptionResolver::new_with_options(base_uri,
options);
- resolver.resolve_options().await?;
- let hudi_configs =
Arc::new(HudiConfigs::new(resolver.hudi_options));
- let storage =
- Storage::new(Arc::new(resolver.storage_options),
hudi_configs.clone())?;
-
- Ok(Self {
- hudi_configs,
- storage,
- })
+ // Collect options upfront so we can move them across thread
boundaries if needed
+ let options_vec: Vec<(String, String)> = options
+ .into_iter()
+ .map(|(k, v)| (k.as_ref().to_string(), v.into()))
+ .collect();
+ let base_uri = base_uri.to_string();
+
+ // Helper function to run the async resolution
+ async fn resolve_reader(
+ base_uri: String,
+ options_vec: Vec<(String, String)>,
+ ) -> Result<FileGroupReader> {
+ let mut resolver = OptionResolver::new_with_options(&base_uri,
options_vec);
+ resolver.resolve_options().await?;
+ let hudi_configs =
Arc::new(HudiConfigs::new(resolver.hudi_options));
+ let storage =
+ Storage::new(Arc::new(resolver.storage_options),
hudi_configs.clone())?;
+
+ Ok(FileGroupReader {
+ hudi_configs,
+ storage,
})
+ }
+
+ // Check if we're already in a tokio runtime
+ match tokio::runtime::Handle::try_current() {
+ Ok(_handle) => {
+ // We're inside a runtime, spawn a new thread with its own
runtime
+ std::thread::scope(|s| {
+ s.spawn(|| {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ .block_on(resolve_reader(base_uri, options_vec))
+ })
+ .join()
+ .unwrap()
+ })
+ }
+ Err(_) => {
+ // No runtime, create one
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(resolve_reader(base_uri, options_vec))
+ }
+ }
Review Comment:
The runtime creation and thread spawning logic in the `new` method is
complex and could have concurrency issues. When already in a tokio runtime,
spawning a new thread with `std::thread::scope` that creates another runtime
may cause issues:
1. The scoped thread pattern forces synchronous waiting, which defeats the
purpose of async
2. Creating a new runtime in a thread spawned from an existing runtime is
unusual and may lead to unexpected behavior
3. The `.unwrap()` calls on lines 129-130 and 133 can panic, which is
undesirable in a library
Consider simplifying this by either:
- Requiring callers to use the async `new` method when already in a runtime
- Using `tokio::task::spawn_blocking` instead of `std::thread::scope`
- Documenting that this constructor should not be called from within an
async context
##########
cpp/src/lib.rs:
##########
@@ -87,74 +84,41 @@ pub fn new_file_group_reader_with_options(
}
impl HudiFileGroupReader {
- pub fn read_file_slice_by_base_file_path(
+ pub fn read_file_slice_by_paths(
&self,
- relative_path: &CxxString,
+ base_file_path: &CxxString,
+ log_file_paths: &CxxVector<CxxString>,
) -> *mut ffi::ArrowArrayStream {
- let relative_path = relative_path
+ let base_file_path = base_file_path
.to_str()
.expect("Failed to convert CxxString to str: Invalid UTF-8
sequence");
- let record_batch = self
- .inner
- .read_file_slice_by_base_file_path_blocking(relative_path)
- .expect("Failed to read file batch");
- let schema = record_batch.schema();
-
- create_raw_pointer_for_record_batches(vec![record_batch], schema)
- }
-
- pub fn read_file_slice(&self, file_slice: &HudiFileSlice) -> *mut
ffi::ArrowArrayStream {
- let record_batch = self
- .inner
- .read_file_slice_blocking(&file_slice.inner)
- .expect("Failed to read file slice");
- let schema = record_batch.schema();
+ let log_file_paths: Vec<String> = log_file_paths
+ .iter()
+ .map(|path| {
+ path.to_str()
+ .expect("Failed to convert CxxString to str: Invalid UTF-8
sequence")
+ .to_string()
+ })
+ .collect();
+
+ let options = ReadOptions::new();
+ let stream = rt()
+ .block_on(
+ self.inner
+ .read_file_slice_by_paths(base_file_path, log_file_paths,
&options),
+ )
+ .expect("Failed to read file slice by paths");
Review Comment:
Multiple `.expect()` calls with panics on lines 93-94, 99-102, and 110-111
can cause the process to abort. In FFI code, panics are particularly dangerous
as they can corrupt the calling C++ program's state. These should be changed to
return proper error results through the FFI boundary using Result types or
error codes.
##########
crates/core/src/file_group/reader.rs:
##########
@@ -305,24 +320,333 @@ impl FileGroupReader {
all_batches.extend(log_batches);
let merger = RecordMerger::new(schema.clone(),
self.hudi_configs.clone());
- merger.merge_record_batches(all_batches)
+ let batch = merger.merge_record_batches(all_batches)?;
+
+ // Apply projection if specified
+ let batch = if let Some(cols) = projection {
+ Self::apply_projection(&batch, cols)?
+ } else {
+ batch
+ };
+
+ // Apply row predicate if specified
+ let batch = if let Some(predicate) = row_predicate {
+ Self::apply_row_predicate(&batch, predicate)?
+ } else {
+ batch
+ };
+
+ Ok(batch)
}
}
- /// Same as [FileGroupReader::read_file_slice_from_paths], but blocking.
- pub fn read_file_slice_from_paths_blocking<I, S>(
+ /// Reads a file slice as a stream of record batches with [ReadOptions].
+ ///
+ /// This is the primary read API for FileGroupReader. It returns a stream
that
+ /// yields record batches as they are read.
+ ///
+ /// For COW tables or read-optimized mode (base file only), this returns a
true
+ /// streaming iterator from the underlying parquet file, yielding batches
as they
+ /// are read without loading all data into memory.
+ ///
+ /// For MOR tables with log files, this falls back to the
collect-and-merge approach
+ /// and yields the merged result as a single batch.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation:
+ /// - `projection`: Column names to project (select)
+ /// - `row_predicate`: Row-level filter predicate
+ /// - `batch_size`: Target rows per batch
+ ///
+ /// # Returns
+ /// A stream of record batches. The stream owns all necessary data and is
`'static`.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ ///
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ /// let mut stream = reader.read_file_slice(&file_slice, &options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// ```
+ pub async fn read_file_slice(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ let base_file_path = file_slice.base_file_relative_path()?;
+ let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+ file_slice
+ .log_files
+ .iter()
+ .map(|log_file| file_slice.log_file_relative_path(log_file))
+ .collect::<Result<Vec<String>>>()?
+ } else {
+ vec![]
+ };
+
+ let use_read_optimized: bool = self
+ .hudi_configs
+ .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+ .into();
+ let base_file_only = log_file_paths.is_empty() || use_read_optimized;
+
+ if base_file_only {
+ // True streaming: return the parquet stream directly
+ self.read_parquet_file_stream(&base_file_path, options).await
+ } else {
+ // Fallback: collect + merge, then yield as single-item stream
+ let batch = self
+ .read_file_slice_from_paths_with_opts(&base_file_path,
log_file_paths, options)
+ .await?;
+ Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+ }
Review Comment:
The streaming implementation for MOR tables with log files (lines 403-408)
falls back to collecting all data into memory first via
`read_file_slice_from_paths_with_opts`, then yields it as a single batch. This
defeats the purpose of streaming for MOR tables and could cause memory issues
with large datasets.
Consider documenting this limitation clearly in the function documentation,
or better yet, implementing true streaming for MOR tables in a future iteration.
##########
crates/core/tests/table_read_tests.rs:
##########
@@ -35,21 +37,23 @@ mod v6_tables {
mod snapshot_queries {
use super::*;
- #[test]
- fn test_empty_table() -> Result<()> {
+ #[tokio::test]
+ async fn test_empty_table() -> Result<()> {
for base_url in SampleTable::V6Empty.urls() {
- let hudi_table = Table::new_blocking(base_url.path())?;
- let records =
hudi_table.read_snapshot_blocking(empty_filters())?;
+ let hudi_table = Table::new(base_url.path()).await?;
+ let stream =
hudi_table.read_snapshot(ReadOptions::new()).await?;
+ let records: Vec<_> = stream.try_collect().await?;
assert!(records.is_empty());
}
Ok(())
}
Review Comment:
The test at line 41 is named `test_empty_table` but there are multiple tests
with this same name in the file (lines 41, 312, 418). While Rust allows this in
different modules, it makes it harder to identify which specific test failed
when running tests. Consider using more descriptive names like
`test_v6_empty_table_snapshot`, `test_v6_empty_table_incremental`,
`test_v8_empty_table_snapshot` etc.
##########
crates/core/src/table/mod.rs:
##########
@@ -642,153 +414,198 @@ impl Table {
)
}
- /// Get all the latest records in the table.
+ /// Create a [FileGroupReader] for this table.
///
/// # Arguments
- /// * `filters` - Partition filters to apply.
- pub async fn read_snapshot<I, S>(&self, filters: I) ->
Result<Vec<RecordBatch>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() {
- let filters = from_str_tuples(filters)?;
- self.read_snapshot_internal(timestamp, &filters).await
- } else {
- Ok(Vec::new())
+ /// * `options` - Read options for configuring the reader
+ pub fn create_file_group_reader(&self, options: &ReadOptions) ->
Result<FileGroupReader> {
+ let mut read_configs: Vec<(HudiReadConfig, String)> = vec![];
+
+ if let Some(ref ts) = options.as_of_timestamp {
+ let formatted = format_timestamp(ts, &self.timezone())?;
+ read_configs.push((HudiReadConfig::FileGroupEndTimestamp,
formatted));
+ } else if let Some(ts) =
self.timeline.get_latest_commit_timestamp_as_option() {
+ read_configs.push((HudiReadConfig::FileGroupEndTimestamp,
ts.to_string()));
}
- }
- /// Same as [Table::read_snapshot], but blocking.
- pub fn read_snapshot_blocking<I, S>(&self, filters: I) ->
Result<Vec<RecordBatch>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.read_snapshot(filters).await })
+ if let Some(ref ts) = options.start_timestamp {
+ let formatted = format_timestamp(ts, &self.timezone())?;
+ read_configs.push((HudiReadConfig::FileGroupStartTimestamp,
formatted));
+ }
+
+ if let Some(batch_size) = options.batch_size {
+ read_configs.push((HudiReadConfig::StreamBatchSize,
batch_size.to_string()));
+ }
+
+ self.create_file_group_reader_with_options(read_configs)
}
- /// Get all the records in the table at a given timestamp.
+ /// Read records from the table as a stream using [ReadOptions].
+ ///
+ /// This is the primary read API. It returns a stream that yields record
batches
+ /// as they are read, enabling memory-efficient processing of large tables.
+ ///
+ /// For COW tables or read-optimized mode, each parquet file is streamed
batch-by-batch.
+ /// For MOR tables with log files, each file slice is read, merged, and
yielded as a batch.
///
/// # Arguments
- /// * `timestamp` - The timestamp which records associated with.
- /// * `filters` - Partition filters to apply.
- pub async fn read_snapshot_as_of<I, S>(
+ /// * `options` - Read options for configuring the read operation:
+ /// - `as_of_timestamp`: If set, reads data as of this timestamp
(time travel)
+ /// - `partition_filter`: Filters for partition pruning
+ /// - `projection`: Column names to project (select)
+ /// - `row_predicate`: Row-level filter predicate
+ /// - `batch_size`: Target rows per batch (effective for
COW/read-optimized)
+ ///
+ /// # Example
+ ///
+ /// ```ignore
+ /// use hudi_core::table::{Table, ReadOptions};
+ /// use hudi_core::expr::filter::col;
+ /// use futures::StreamExt;
+ ///
+ /// // Read latest snapshot
+ /// let mut stream = table.read_snapshot(ReadOptions::new()).await?;
+ ///
+ /// // Read with partition filter and column projection
+ /// let options = ReadOptions::new()
+ /// .with_partition_filter(col("date").eq("2024-01-01"))
+ /// .with_column(["id", "name", "value"])
+ /// .with_batch_size(4096);
+ ///
+ /// let mut stream = table.read_snapshot(options).await?;
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ ///
+ /// // Time travel: read snapshot as of a specific timestamp
+ /// let options = ReadOptions::new().as_of("2024-01-01T12:00:00Z");
+ /// let mut stream = table.read_snapshot(options).await?;
+ /// ```
+ pub async fn read_snapshot(
&self,
- timestamp: &str,
- filters: I,
- ) -> Result<Vec<RecordBatch>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- let timestamp = format_timestamp(timestamp, &self.timezone())?;
- let filters = from_str_tuples(filters)?;
- self.read_snapshot_internal(×tamp, &filters).await
- }
+ options: ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ // Determine the timestamp: use as_of_timestamp from options, or latest
+ let timestamp = if let Some(ref ts) = options.as_of_timestamp {
+ format_timestamp(ts, &self.timezone())?
+ } else if let Some(ts) =
self.timeline.get_latest_commit_timestamp_as_option() {
+ ts.to_string()
+ } else {
+ return Ok(Box::pin(futures::stream::empty()));
+ };
- /// Same as [Table::read_snapshot_as_of], but blocking.
- pub fn read_snapshot_as_of_blocking<I, S>(
- &self,
- timestamp: &str,
- filters: I,
- ) -> Result<Vec<RecordBatch>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.read_snapshot_as_of(timestamp,
filters).await })
+ self.read_snapshot_internal(×tamp, options).await
}
async fn read_snapshot_internal(
&self,
timestamp: &str,
- filters: &[Filter],
- ) -> Result<Vec<RecordBatch>> {
- let file_slices = self.get_file_slices_internal(timestamp,
filters).await?;
- let fg_reader = self.create_file_group_reader_with_options([(
- HudiReadConfig::FileGroupEndTimestamp,
- timestamp,
- )])?;
- let batches =
- futures::future::try_join_all(file_slices.iter().map(|f|
fg_reader.read_file_slice(f)))
- .await?;
- Ok(batches)
+ options: ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ let file_slices = self
+ .get_file_slices_internal(timestamp, &options.partition_filter)
+ .await?;
+
+ // Build the base read config options
+ let mut read_configs: Vec<(HudiReadConfig, String)> =
+ vec![(HudiReadConfig::FileGroupEndTimestamp,
timestamp.to_string())];
+
+ // Add batch size if specified
+ if let Some(batch_size) = options.batch_size {
+ read_configs.push((HudiReadConfig::StreamBatchSize,
batch_size.to_string()));
+ }
+
+ let fg_reader =
self.create_file_group_reader_with_options(read_configs)?;
+
+ // Create a stream that yields batches from all file slices
+ // Each file slice produces a 'static stream, which we flatten
+ let stream = futures::stream::iter(file_slices)
+ .then(move |file_slice| {
+ let fg_reader = fg_reader.clone();
+ let options = options.clone();
+ async move {
+ fg_reader.read_file_slice(&file_slice, &options).await
+ }
+ })
+ .try_flatten();
+
+ Ok(Box::pin(stream))
}
Review Comment:
The streaming read APIs return `BoxStream<'static, Result<RecordBatch>>`,
but for truly large datasets, users may want to apply backpressure or limit
memory usage. Consider documenting best practices for:
1. How to limit concurrent processing of batches
2. How to apply backpressure when consumers are slower than producers
3. Memory implications of the 'static lifetime
Additionally, consider whether a bounded buffer or backpressure mechanism
should be built into the stream.
##########
crates/core/src/file_group/reader.rs:
##########
@@ -305,24 +320,333 @@ impl FileGroupReader {
all_batches.extend(log_batches);
let merger = RecordMerger::new(schema.clone(),
self.hudi_configs.clone());
- merger.merge_record_batches(all_batches)
+ let batch = merger.merge_record_batches(all_batches)?;
+
+ // Apply projection if specified
+ let batch = if let Some(cols) = projection {
+ Self::apply_projection(&batch, cols)?
+ } else {
+ batch
+ };
+
+ // Apply row predicate if specified
+ let batch = if let Some(predicate) = row_predicate {
+ Self::apply_row_predicate(&batch, predicate)?
+ } else {
+ batch
+ };
+
+ Ok(batch)
}
}
- /// Same as [FileGroupReader::read_file_slice_from_paths], but blocking.
- pub fn read_file_slice_from_paths_blocking<I, S>(
+ /// Reads a file slice as a stream of record batches with [ReadOptions].
+ ///
+ /// This is the primary read API for FileGroupReader. It returns a stream
that
+ /// yields record batches as they are read.
+ ///
+ /// For COW tables or read-optimized mode (base file only), this returns a
true
+ /// streaming iterator from the underlying parquet file, yielding batches
as they
+ /// are read without loading all data into memory.
+ ///
+ /// For MOR tables with log files, this falls back to the
collect-and-merge approach
+ /// and yields the merged result as a single batch.
+ ///
+ /// # Arguments
+ /// * `file_slice` - The file slice to read.
+ /// * `options` - Read options for configuring the read operation:
+ /// - `projection`: Column names to project (select)
+ /// - `row_predicate`: Row-level filter predicate
+ /// - `batch_size`: Target rows per batch
+ ///
+ /// # Returns
+ /// A stream of record batches. The stream owns all necessary data and is
`'static`.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ ///
+ /// let options = ReadOptions::new().with_batch_size(4096);
+ /// let mut stream = reader.read_file_slice(&file_slice, &options).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// ```
+ pub async fn read_file_slice(
+ &self,
+ file_slice: &FileSlice,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ let base_file_path = file_slice.base_file_relative_path()?;
+ let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+ file_slice
+ .log_files
+ .iter()
+ .map(|log_file| file_slice.log_file_relative_path(log_file))
+ .collect::<Result<Vec<String>>>()?
+ } else {
+ vec![]
+ };
+
+ let use_read_optimized: bool = self
+ .hudi_configs
+ .get_or_default(HudiReadConfig::UseReadOptimizedMode)
+ .into();
+ let base_file_only = log_file_paths.is_empty() || use_read_optimized;
+
+ if base_file_only {
+ // True streaming: return the parquet stream directly
+ self.read_parquet_file_stream(&base_file_path, options).await
+ } else {
+ // Fallback: collect + merge, then yield as single-item stream
+ let batch = self
+ .read_file_slice_from_paths_with_opts(&base_file_path,
log_file_paths, options)
+ .await?;
+ Ok(Box::pin(futures::stream::once(async { Ok(batch) })))
+ }
+ }
+
+ /// Reads a file slice specified by file paths as a stream of record
batches.
+ ///
+ /// This is a convenience method that constructs a [FileSlice] from the
given paths
+ /// and delegates to [Self::read_file_slice].
+ ///
+ /// # Arguments
+ /// * `base_file_path` - Relative path to the base file (e.g.,
"city=chennai/file.parquet")
+ /// * `log_file_paths` - Optional iterator of relative paths to log files
+ /// * `options` - Read options for configuring the read operation
+ ///
+ /// # Returns
+ /// A stream of record batches.
+ ///
+ /// # Example
+ /// ```ignore
+ /// use futures::StreamExt;
+ ///
+ /// let options = ReadOptions::new();
+ /// let mut stream = reader.read_file_slice_by_paths(
+ /// "city=chennai/file-id-0_0-7-24_20240101120000.parquet",
+ /// [".file-id-0_20240101120000.log.1_0-51-115"],
+ /// &options,
+ /// ).await?;
+ ///
+ /// while let Some(result) = stream.next().await {
+ /// let batch = result?;
+ /// // Process batch...
+ /// }
+ /// ```
+ pub async fn read_file_slice_by_paths<I, S>(
&self,
base_file_path: &str,
log_file_paths: I,
- ) -> Result<RecordBatch>
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(self.read_file_slice_from_paths(base_file_path,
log_file_paths))
+ use std::path::Path;
+
+ // Parse base file path to extract partition path and file name
+ let path = Path::new(base_file_path);
+ let partition_path = path
+ .parent()
+ .map(|p| p.to_string_lossy().to_string())
+ .unwrap_or_default();
+ let base_file_name = path
+ .file_name()
+ .ok_or_else(|| {
+ CoreError::FileGroup(format!(
+ "Cannot extract file name from base file path:
{base_file_path}"
+ ))
+ })?
+ .to_string_lossy();
+
+ // Create a FileGroup from the base file
+ let mut file_group =
FileGroup::new_with_base_file_name(&base_file_name, &partition_path)?;
+
+ // Add log files if provided
+ for log_file_path in log_file_paths {
+ let log_path = Path::new(log_file_path.as_ref());
+ let log_file_name = log_path
+ .file_name()
+ .ok_or_else(|| {
+ CoreError::FileGroup(format!(
+ "Cannot extract file name from log file path: {}",
+ log_file_path.as_ref()
+ ))
+ })?
+ .to_string_lossy();
+ file_group.add_log_file_from_name(&log_file_name)?;
+ }
+
+ // Get the file slice (there should be exactly one after adding base
file)
+ let file_slice = file_group
+ .file_slices
+ .values()
+ .next()
+ .ok_or_else(|| CoreError::FileGroup("No file slice found in file
group".to_string()))?
+ .clone();
+
+ // Delegate to the primary read API
+ self.read_file_slice(&file_slice, options).await
+ }
+
+ /// Reads a parquet file as a stream of record batches.
+ ///
+ /// This method propagates the underlying parquet stream, applying
Hudi-specific
+ /// filtering (commit time filtering for incremental reads) to each batch.
+ /// The returned stream owns all necessary data and is `'static`.
+ async fn read_parquet_file_stream(
+ &self,
+ relative_path: &str,
+ options: &ReadOptions,
+ ) -> Result<BoxStream<'static, Result<RecordBatch>>> {
+ let batch_size = options.batch_size.unwrap_or(8192);
+ let parquet_options = ParquetReadOptions::with_batch_size(batch_size);
+
+ let stream = self
+ .storage
+ .get_parquet_file_stream(relative_path, parquet_options)
+ .await
+ .map_err(|e| ReadFileSliceError(format!("Failed to read path
{relative_path}: {e:?}")))?;
+
+ // Clone data for the closure so the stream is 'static
+ let reader = self.clone();
+ let projection = options.projection.clone();
+ let row_predicate = options.row_predicate.clone();
+
+ let mapped_stream = stream.map(move |result| {
+ let batch = result
+ .map_err(|e| ReadFileSliceError(format!("Failed to read batch:
{e:?}")))?;
+
+ // Apply Hudi commit time filtering
+ let batch = if let Some(mask) =
reader.create_filtering_mask_for_base_file_records(&batch)? {
+ filter_record_batch(&batch, &mask)
+ .map_err(|e| ReadFileSliceError(format!("Failed to filter
records: {e:?}")))?
+ } else {
+ batch
+ };
+
+ // Apply column projection if specified
+ let batch = if let Some(ref cols) = projection {
+ FileGroupReader::apply_projection(&batch, cols)?
+ } else {
+ batch
+ };
+
+ // Apply row predicate if specified
+ let batch = if let Some(ref predicate) = row_predicate {
+ FileGroupReader::apply_row_predicate(&batch, predicate)?
+ } else {
+ batch
+ };
+
+ Ok(batch)
+ });
+
+ Ok(Box::pin(mapped_stream))
+ }
+
+ /// Creates an instant range for log file scanning using ReadOptions.
+ fn create_instant_range_for_log_file_scan_with_opts(&self, options:
&ReadOptions) -> InstantRange {
+ let timezone = self
+ .hudi_configs
+ .get_or_default(HudiTableConfig::TimelineTimezone)
+ .into();
+
+ // Use as_of_timestamp from options if provided, otherwise fall back
to config
+ let start_timestamp = self
+ .hudi_configs
+ .try_get(HudiReadConfig::FileGroupStartTimestamp)
+ .map(|v| -> String { v.into() });
+
+ let end_timestamp = options
+ .as_of_timestamp
+ .clone()
+ .or_else(|| {
+ self.hudi_configs
+ .try_get(HudiReadConfig::FileGroupEndTimestamp)
+ .map(|v| -> String { v.into() })
+ });
+
+ InstantRange::new(timezone, start_timestamp, end_timestamp, false,
true)
+ }
+
+ /// Applies column projection to a record batch.
+ fn apply_projection(batch: &RecordBatch, columns: &[String]) ->
Result<RecordBatch> {
+ let schema = batch.schema();
+ let indices: Vec<usize> = columns
+ .iter()
+ .filter_map(|col| schema.index_of(col).ok())
+ .collect();
+
+ if indices.is_empty() {
+ return Err(ReadFileSliceError(
+ "No matching columns found for projection".to_string(),
+ ));
+ }
+
+ batch
+ .project(&indices)
+ .map_err(|e| ReadFileSliceError(format!("Failed to project
columns: {e:?}")))
+ }
+
+ /// Applies a row predicate filter to a record batch.
+ fn apply_row_predicate(batch: &RecordBatch, predicate: &Filter) ->
Result<RecordBatch> {
+ let schema = batch.schema();
+ let schemable = SchemableFilter::try_from((predicate.clone(),
schema.as_ref()))?;
+
+ // Get the column to filter on
+ let column = batch.column_by_name(&predicate.field_name).ok_or_else(||
{
+ ReadFileSliceError(format!("Column '{}' not found for predicate",
predicate.field_name))
+ })?;
+
+ // Apply the comparison to get a boolean mask
+ let mask = schemable.apply_comparsion(column)?;
+
+ // Filter the batch
+ filter_record_batch(batch, &mask)
+ .map_err(|e| ReadFileSliceError(format!("Failed to apply row
predicate: {e:?}")))
+ }
Review Comment:
The `apply_row_predicate` method only supports single-field predicates. Line
603 gets a single column by `predicate.field_name`, but the Filter type might
support more complex predicates with multiple fields or nested conditions. If a
compound filter is passed, this will fail or only partially apply the filter.
Consider adding validation to ensure only simple single-field predicates are
supported, or extend the implementation to handle compound filters properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]