This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b658a98 feat: support incremental read MOR tables (#258)
b658a98 is described below
commit b658a9853f800090a7f4c57d6504898158de53ca
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jan 21 20:57:35 2025 -0600
feat: support incremental read MOR tables (#258)
Adjust the arguments to pass time range to file group readers to support
incremental read for MOR tables.
---
crates/core/src/file_group/builder.rs | 51 +++++--
crates/core/src/table/mod.rs | 167 +++++++++++----------
crates/core/src/timeline/selector.rs | 48 +++++-
...v6_simplekeygen_nonhivestyle_overwritetable.sql | 101 +++++++++++++
...v6_simplekeygen_nonhivestyle_overwritetable.zip | Bin 0 -> 48409 bytes
5 files changed, 269 insertions(+), 98 deletions(-)
diff --git a/crates/core/src/file_group/builder.rs
b/crates/core/src/file_group/builder.rs
index 4f73aa1..035c681 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -44,21 +44,42 @@ pub fn build_file_groups(commit_metadata: &Map<String,
Value>) -> Result<HashSet
.and_then(|v| v.as_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid fileId in
write stats".into()))?;
- let path = stat
- .get("path")
- .and_then(|v| v.as_str())
- .ok_or_else(|| CoreError::CommitMetadata("Invalid path in
write stats".into()))?;
-
- let file_name = Path::new(path)
- .file_name()
- .and_then(|name| name.to_str())
- .ok_or_else(|| CoreError::CommitMetadata("Invalid file name in
path".into()))?;
-
- let file_group = FileGroup::new_with_base_file_name(
- file_id.to_string(),
- partition.clone(),
- file_name,
- )?;
+ let mut file_group = FileGroup::new(file_id.to_string(),
partition.clone());
+
+ if let Some(base_file_name) = stat.get("baseFile") {
+ let base_file_name = base_file_name
+ .as_str()
+ .ok_or_else(|| CoreError::CommitMetadata("Invalid base
file name".into()))?;
+ file_group.add_base_file_from_name(base_file_name)?;
+
+ if let Some(log_file_names) = stat.get("logFiles") {
+ let log_file_names =
log_file_names.as_array().ok_or_else(|| {
+ CoreError::CommitMetadata("Invalid log files
array".into())
+ })?;
+ for log_file_name in log_file_names {
+ let log_file_name =
log_file_name.as_str().ok_or_else(|| {
+ CoreError::CommitMetadata("Invalid log file
name".into())
+ })?;
+ file_group.add_log_file_from_name(log_file_name)?;
+ }
+ } else {
+ return Err(CoreError::CommitMetadata(
+ "Missing log files in write stats".into(),
+ ));
+ }
+ } else {
+ let path = stat.get("path").and_then(|v|
v.as_str()).ok_or_else(|| {
+ CoreError::CommitMetadata("Invalid path in write
stats".into())
+ })?;
+
+ let file_name = Path::new(path)
+ .file_name()
+ .and_then(|name| name.to_str())
+ .ok_or_else(|| CoreError::CommitMetadata("Invalid file
name in path".into()))?;
+
+ file_group.add_base_file_from_name(file_name)?;
+ }
+
file_groups.insert(file_group);
}
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 31e25e0..7f5a82c 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -326,7 +326,7 @@ impl Table {
end_timestamp: Option<&str>,
) -> Result<Vec<RecordBatch>> {
// If the end timestamp is not provided, use the latest commit
timestamp.
- let Some(as_of_timestamp) =
+ let Some(end_timestamp) =
end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp())
else {
return Ok(Vec::new());
@@ -336,10 +336,10 @@ impl Table {
let mut file_slices: Vec<FileSlice> = Vec::new();
let file_groups = self
.timeline
- .get_incremental_file_groups(Some(start_timestamp),
Some(as_of_timestamp))
+ .get_incremental_file_groups(Some(start_timestamp),
Some(end_timestamp))
.await?;
for file_group in file_groups {
- if let Some(file_slice) =
file_group.get_file_slice_as_of(as_of_timestamp) {
+ if let Some(file_slice) =
file_group.get_file_slice_as_of(end_timestamp) {
file_slices.push(file_slice.clone());
}
}
@@ -347,13 +347,14 @@ impl Table {
// Read incremental records from the file slices.
let filters = &[
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
-
FilterField::new(MetaField::CommitTime.as_ref()).lte(as_of_timestamp),
+
FilterField::new(MetaField::CommitTime.as_ref()).lte(end_timestamp),
];
let fg_reader =
self.create_file_group_reader_with_filters(filters,
MetaField::schema().as_ref())?;
let base_file_only = self.get_table_type() ==
TableTypeValue::CopyOnWrite;
let timezone = self.timezone();
- let instant_range = InstantRange::up_to(as_of_timestamp, &timezone);
+ let instant_range =
+ InstantRange::within_open_closed(start_timestamp, end_timestamp,
&timezone);
let batches = futures::future::try_join_all(
file_slices
.iter()
@@ -992,93 +993,95 @@ mod tests {
mod test_incremental_queries {
use super::super::*;
- use arrow_array::{Array, StringArray};
+ use arrow_select::concat::concat_batches;
use hudi_tests::SampleTable;
- use std::collections::HashSet;
#[tokio::test]
async fn test_empty() -> Result<()> {
- let base_url = SampleTable::V6Empty.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await?;
-
- let records = hudi_table.read_incremental_records("0",
None).await?;
- assert!(records.is_empty());
-
+ for base_url in SampleTable::V6Empty.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_incremental_records("0",
None).await?;
+ assert!(records.is_empty())
+ }
Ok(())
}
#[tokio::test]
async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()>
{
- let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await?;
-
- // read records changed from the first commit (exclusive) to the
second commit (inclusive)
- let records = hudi_table
- .read_incremental_records("20240707001301554",
Some("20240707001302376"))
- .await?;
- assert_eq!(records.len(), 2);
- assert_eq!(records[0].num_rows(), 1);
- assert_eq!(records[1].num_rows(), 1);
-
- // verify the partition paths
- let partition_paths = StringArray::from(
- arrow::compute::concat(&[
-
records[0].column_by_name("_hoodie_partition_path").unwrap(),
-
records[1].column_by_name("_hoodie_partition_path").unwrap(),
- ])?
- .to_data(),
- );
- let actual_partition_paths =
- HashSet::<&str>::from_iter(partition_paths.iter().map(|s|
s.unwrap()));
- let expected_partition_paths = HashSet::from_iter(vec!["10",
"30"]);
- assert_eq!(actual_partition_paths, expected_partition_paths);
-
- // verify the file names
- let file_names = StringArray::from(
- arrow::compute::concat(&[
- records[0].column_by_name("_hoodie_file_name").unwrap(),
- records[1].column_by_name("_hoodie_file_name").unwrap(),
- ])?
- .to_data(),
- );
- let actual_file_names =
- HashSet::<&str>::from_iter(file_names.iter().map(|s|
s.unwrap()));
- let expected_file_names = HashSet::from_iter(vec![
-
"d398fae1-c0e6-4098-8124-f55f7098bdba-0_1-95-136_20240707001302376.parquet",
-
"4f2685a3-614f-49ca-9b2b-e1cb9fb61f27-0_0-95-135_20240707001302376.parquet",
- ]);
- assert_eq!(actual_file_names, expected_file_names);
-
- // read records changed from the first commit (exclusive) to
- // the latest (an insert overwrite table's replacecommit)
- let records = hudi_table
- .read_incremental_records("20240707001301554", None)
- .await?;
- assert_eq!(records.len(), 1);
- assert_eq!(records[0].num_rows(), 1);
-
- // verify the partition paths
- let actual_partition_paths = StringArray::from(
- records[0]
- .column_by_name("_hoodie_partition_path")
- .unwrap()
- .to_data(),
- );
- let expected_partition_paths = StringArray::from(vec!["30"]);
- assert_eq!(actual_partition_paths, expected_partition_paths);
-
- // verify the file names
- let actual_file_names = StringArray::from(
- records[0]
- .column_by_name("_hoodie_file_name")
- .unwrap()
- .to_data(),
- );
- let expected_file_names = StringArray::from(vec![
-
"ebcb261d-62d3-4895-90ec-5b3c9622dff4-0_0-111-154_20240707001303088.parquet",
- ]);
- assert_eq!(actual_file_names, expected_file_names);
+ for base_url in
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() {
+ let hudi_table = Table::new(base_url.path()).await?;
+ let commit_timestamps = hudi_table
+ .timeline
+ .completed_commits
+ .iter()
+ .map(|i| i.timestamp.as_str())
+ .collect::<Vec<_>>();
+ assert_eq!(commit_timestamps.len(), 3);
+ let first_commit = commit_timestamps[0];
+ let second_commit = commit_timestamps[1];
+ let third_commit = commit_timestamps[2];
+
+ // read records changed from the beginning to the 1st commit
+ let records = hudi_table
+ .read_incremental_records("19700101000000",
Some(first_commit))
+ .await?;
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
+ let sample_data =
SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol",
true),],
+ "Should return 3 records inserted in the 1st commit"
+ );
+ // read records changed from the 1st to the 2nd commit
+ let records = hudi_table
+ .read_incremental_records(first_commit,
Some(second_commit))
+ .await?;
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
+ let sample_data =
SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![(1, "Alice", false), (4, "Diana", true),],
+ "Should return 2 records inserted or updated in the 2nd
commit"
+ );
+
+ // read records changed from the 2nd to the 3rd commit
+ let records = hudi_table
+ .read_incremental_records(second_commit,
Some(third_commit))
+ .await?;
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
+ let sample_data =
SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![(4, "Diana", false),],
+ "Should return 1 record insert-overwritten in the 3rd
commit"
+ );
+
+ // read records changed from the 1st commit
+ let records = hudi_table
+ .read_incremental_records(first_commit, None)
+ .await?;
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
+ let sample_data =
SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![(4, "Diana", false),],
+ "Should return 1 record insert-overwritten in the 3rd
commit"
+ );
+
+ // read records changed from the 3rd commit
+ let records = hudi_table
+ .read_incremental_records(third_commit, None)
+ .await?;
+ assert!(
+ records.is_empty(),
+ "Should return 0 record as it's the latest commit"
+ );
+ }
Ok(())
}
}
diff --git a/crates/core/src/timeline/selector.rs
b/crates/core/src/timeline/selector.rs
index ff117a7..3799f64 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -25,6 +25,7 @@ use crate::Result;
use chrono::{DateTime, Utc};
use std::sync::Arc;
+#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct InstantRange {
timezone: String,
@@ -51,7 +52,7 @@ impl InstantRange {
}
}
- /// Create a new [InstantRange] with an end timestamp inclusive.
+ /// Create a new [InstantRange] with a closed end timestamp range.
pub fn up_to(end_timestamp: &str, timezone: &str) -> Self {
Self::new(
timezone.to_string(),
@@ -62,6 +63,28 @@ impl InstantRange {
)
}
+ /// Create a new [InstantRange] with an open timestamp range.
+ pub fn within(start_timestamp: &str, end_timestamp: &str, timezone: &str)
-> Self {
+ Self::new(
+ timezone.to_string(),
+ Some(start_timestamp.to_string()),
+ Some(end_timestamp.to_string()),
+ false,
+ false,
+ )
+ }
+
+ /// Create a new [InstantRange] with an open start and closed end
timestamp range.
+ pub fn within_open_closed(start_timestamp: &str, end_timestamp: &str,
timezone: &str) -> Self {
+ Self::new(
+ timezone.to_string(),
+ Some(start_timestamp.to_string()),
+ Some(end_timestamp.to_string()),
+ false,
+ true,
+ )
+ }
+
pub fn timezone(&self) -> &str {
&self.timezone
}
@@ -304,6 +327,29 @@ mod tests {
assert!(range.end_inclusive);
}
+ #[test]
+ fn test_within() {
+ let range = InstantRange::within("20240101000000000",
"20241231235959999", "UTC");
+
+ assert_eq!(range.timezone(), "UTC");
+ assert_eq!(range.start_timestamp.as_deref(),
Some("20240101000000000"));
+ assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
+ assert!(!range.start_inclusive);
+ assert!(!range.end_inclusive);
+ }
+
+ #[test]
+ fn test_within_open_closed() {
+ let range =
+ InstantRange::within_open_closed("20240101000000000",
"20241231235959999", "UTC");
+
+ assert_eq!(range.timezone(), "UTC");
+ assert_eq!(range.start_timestamp.as_deref(),
Some("20240101000000000"));
+ assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
+ assert!(!range.start_inclusive);
+ assert!(range.end_inclusive);
+ }
+
#[test]
fn test_is_in_range_inclusive_bounds() {
let range = InstantRange::new(
diff --git
a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql
new file mode 100644
index 0000000..d86e5df
--- /dev/null
+++
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v6_simplekeygen_nonhivestyle_overwritetable (
+ id INT,
+ name STRING,
+ isActive BOOLEAN,
+ shortField SHORT,
+ intField INT,
+ longField LONG,
+ floatField FLOAT,
+ doubleField DOUBLE,
+ decimalField DECIMAL(10,5),
+ dateField DATE,
+ timestampField TIMESTAMP,
+ binaryField BINARY,
+ arrayField
ARRAY<STRUCT<arr_struct_f1: STRING, arr_struct_f2: INT>>, -- Array of structs
+ mapField MAP<STRING,
STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>,
-- Map with struct values
+ structField STRUCT<
+ field1: STRING,
+ field2: INT,
+ child_struct: STRUCT<
+ child_field1: DOUBLE,
+ child_field2: BOOLEAN
+ >
+ >,
+ byteField BYTE
+)
+ USING HUDI
+ LOCATION
'/opt/data/external_tables/v6_simplekeygen_nonhivestyle_overwritetable'
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ preCombineField = 'longField',
+ 'hoodie.metadata.enable' = 'false',
+ 'hoodie.datasource.write.hive_style_partitioning' = 'false',
+ 'hoodie.datasource.write.drop.partition.columns' = 'false',
+ 'hoodie.table.log.file.format' = 'PARQUET',
+ 'hoodie.logfile.data.block.format' = 'parquet',
+ 'hoodie.datasource.write.record.merger.impls' =
'org.apache.hudi.HoodieSparkRecordMerger',
+ 'hoodie.parquet.small.file.limit' = '0'
+)
+PARTITIONED BY (byteField);
+
+INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
+ (1, 'Alice', true, 300, 15000,
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE),
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+ ARRAY(STRUCT('red', 100),
STRUCT('blue', 200), STRUCT('green', 300)),
+ MAP('key1', STRUCT(123.456,
true), 'key2', STRUCT(789.012, false)),
+ STRUCT('Alice', 30,
STRUCT(123.456, true)),
+ 10
+ ),
+ (2, 'Bob', false, 100, 25000,
9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE),
CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+ ARRAY(STRUCT('yellow', 400),
STRUCT('purple', 500)),
+ MAP('key3', STRUCT(234.567,
true), 'key4', STRUCT(567.890, false)),
+ STRUCT('Bob', 40,
STRUCT(789.012, false)),
+ 20
+ ),
+ (3, 'Carol', true, 200, 35000,
1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE),
CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS
BINARY),
+ ARRAY(STRUCT('black', 600),
STRUCT('white', 700), STRUCT('pink', 800)),
+ MAP('key5', STRUCT(345.678,
true), 'key6', STRUCT(654.321, false)),
+ STRUCT('Carol', 25,
STRUCT(456.789, true)),
+ 10
+ );
+
+INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
+ (1, 'Alice', false, 300, 15000,
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE),
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+ ARRAY(STRUCT('red', 100),
STRUCT('blue', 200), STRUCT('green', 300)),
+ MAP('key1', STRUCT(123.456,
true), 'key2', STRUCT(789.012, false)),
+ STRUCT('Alice', 30,
STRUCT(123.456, true)),
+ 10
+ ),
+ (4, 'Diana', true, 500, 45000,
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE),
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+ ARRAY(STRUCT('orange', 900),
STRUCT('gray', 1000)),
+ MAP('key7', STRUCT(456.789,
true), 'key8', STRUCT(123.456, false)),
+ STRUCT('Diana', 50,
STRUCT(987.654, true)),
+ 30
+ );
+
+INSERT OVERWRITE TABLE v6_simplekeygen_nonhivestyle_overwritetable SELECT
+ 4, 'Diana', false, 500, 45000,
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE),
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+ ARRAY(STRUCT('orange', 900),
STRUCT('gray', 1000)),
+ MAP('key7', STRUCT(456.789,
true), 'key8', STRUCT(123.456, false)),
+ STRUCT('Diana', 50,
STRUCT(987.654, true)),
+ 30
+ ;
diff --git
a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.zip
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.zip
new file mode 100644
index 0000000..afd3208
Binary files /dev/null and
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.zip
differ