This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0effb24 fix: improve api to get file slices splits (#185)
0effb24 is described below
commit 0effb24a6a7c196f948fd43a741557342ee1128d
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Nov 19 21:05:52 2024 -1000
fix: improve api to get file slices splits (#185)
---
crates/core/src/table/mod.rs | 36 ++++++++++++++++++++++++++++++++----
crates/datafusion/src/lib.rs | 2 +-
python/hudi/__init__.py | 13 +++++++++----
python/hudi/_internal.pyi | 4 ++--
python/src/internal.rs | 4 ++--
python/tests/test_table_read.py | 2 +-
6 files changed, 47 insertions(+), 14 deletions(-)
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0d25db7..4d69bbf 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -66,7 +66,7 @@
//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
//! let hudi_table = Table::new(base_uri.path()).await.unwrap();
//! let file_slices = hudi_table
-//! .split_file_slices(2, &[])
+//! .get_file_slices_splits(2, &[])
//! .await.unwrap();
//! // define every parquet task reader how many slice
//! let mut parquet_file_groups: Vec<Vec<String>> = Vec::new();
@@ -186,14 +186,22 @@ impl Table {
Ok(Schema::new(partition_fields))
}
- /// Split the file into a specified number of parts
- pub async fn split_file_slices(
+ /// Get all the [FileSlice]s in the table.
+ ///
+ /// The file slices are split into `n` chunks.
+ ///
+ /// If the [AsOfTimestamp] configuration is set, the file slices at the
specified timestamp will be returned.
+ pub async fn get_file_slices_splits(
&self,
n: usize,
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
- let n = std::cmp::max(1, n);
let file_slices = self.get_file_slices(filters).await?;
+ if file_slices.is_empty() {
+ return Ok(Vec::new());
+ }
+
+ let n = std::cmp::max(1, n);
let chunk_size = (file_slices.len() + n - 1) / n;
Ok(file_slices
@@ -614,6 +622,26 @@ mod tests {
assert_eq!(batches.num_columns(), 21);
}
+ #[tokio::test]
+ async fn empty_hudi_table_get_file_slices_splits() {
+ let base_url = TestTable::V6Empty.url();
+
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let file_slices_splits = hudi_table.get_file_slices_splits(2,
&[]).await.unwrap();
+ assert!(file_slices_splits.is_empty());
+ }
+
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_splits() {
+ let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
+
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let file_slices_splits = hudi_table.get_file_slices_splits(2,
&[]).await.unwrap();
+ assert_eq!(file_slices_splits.len(), 2);
+ assert_eq!(file_slices_splits[0].len(), 2);
+ assert_eq!(file_slices_splits[1].len(), 1);
+ }
+
#[tokio::test]
async fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = TestTable::V6Nonpartitioned.url();
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 6dbc624..739cc90 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -105,7 +105,7 @@ impl TableProvider for HudiDataSource {
let file_slices = self
.table
// TODO: implement supports_filters_pushdown() to pass filters to
Hudi table API
- .split_file_slices(self.get_input_partitions(), &[])
+ .get_file_slices_splits(self.get_input_partitions(), &[])
.await
.map_err(|e| Execution(format!("Failed to get file slices from
Hudi table: {}", e)))?;
let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index fae5cab..ae6b9aa 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -16,8 +16,13 @@
# under the License.
-from hudi._internal import HudiFileGroupReader as HudiFileGroupReader
-from hudi._internal import HudiFileSlice as HudiFileSlice
-from hudi._internal import HudiTable as HudiTable
+from hudi._internal import HudiFileGroupReader, HudiFileSlice, HudiTable
from hudi._internal import __version__ as __version__
-from hudi.table.builder import HudiTableBuilder as HudiTableBuilder
+from hudi.table.builder import HudiTableBuilder
+
+__all__ = [
+ "HudiFileGroupReader",
+ "HudiFileSlice",
+ "HudiTable",
+ "HudiTableBuilder",
+]
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 081a229..48a9eef 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -141,11 +141,11 @@ class HudiTable:
Dict[str, str]: A dictionary of storage options.
"""
...
- def split_file_slices(
+ def get_file_slices_splits(
self, n: int, filters: Optional[List[Tuple[str, str, str]]]
) -> List[List[HudiFileSlice]]:
"""
- Splits the file slices into 'n' parts, optionally filtered by given
filters.
+ Retrieves all file slices in the Hudi table in 'n' splits, optionally
filtered by given filters.
Parameters:
n (int): The number of parts to split the file slices into.
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 113c6c9..37201cd 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -157,7 +157,7 @@ impl HudiTable {
}
#[pyo3(signature = (n, filters=None))]
- fn split_file_slices(
+ fn get_file_slices_splits(
&self,
n: usize,
filters: Option<Vec<(String, String, String)>>,
@@ -166,7 +166,7 @@ impl HudiTable {
py.allow_threads(|| {
let file_slices = rt().block_on(
self.inner
- .split_file_slices(n,
vec_to_slice!(filters.unwrap_or_default())),
+ .get_file_slices_splits(n,
vec_to_slice!(filters.unwrap_or_default())),
)?;
Ok(file_slices
.iter()
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index fbed1fd..baebdff 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -80,7 +80,7 @@ def test_read_table_can_read_from_batches(get_sample_table):
assert t.num_rows == 1
assert t.num_columns == 11
- file_slices_gen = iter(table.split_file_slices(2))
+ file_slices_gen = iter(table.get_file_slices_splits(2))
assert len(next(file_slices_gen)) == 3
assert len(next(file_slices_gen)) == 2