This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b658a98  feat: support incremental read MOR tables (#258)
b658a98 is described below

commit b658a9853f800090a7f4c57d6504898158de53ca
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jan 21 20:57:35 2025 -0600

    feat: support incremental read MOR tables (#258)
    
    Adjust the arguments to pass time range to file group readers to support 
incremental read for MOR tables.
---
 crates/core/src/file_group/builder.rs              |  51 +++++--
 crates/core/src/table/mod.rs                       | 167 +++++++++++----------
 crates/core/src/timeline/selector.rs               |  48 +++++-
 ...v6_simplekeygen_nonhivestyle_overwritetable.sql | 101 +++++++++++++
 ...v6_simplekeygen_nonhivestyle_overwritetable.zip | Bin 0 -> 48409 bytes
 5 files changed, 269 insertions(+), 98 deletions(-)

diff --git a/crates/core/src/file_group/builder.rs 
b/crates/core/src/file_group/builder.rs
index 4f73aa1..035c681 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -44,21 +44,42 @@ pub fn build_file_groups(commit_metadata: &Map<String, 
Value>) -> Result<HashSet
                 .and_then(|v| v.as_str())
                 .ok_or_else(|| CoreError::CommitMetadata("Invalid fileId in 
write stats".into()))?;
 
-            let path = stat
-                .get("path")
-                .and_then(|v| v.as_str())
-                .ok_or_else(|| CoreError::CommitMetadata("Invalid path in 
write stats".into()))?;
-
-            let file_name = Path::new(path)
-                .file_name()
-                .and_then(|name| name.to_str())
-                .ok_or_else(|| CoreError::CommitMetadata("Invalid file name in 
path".into()))?;
-
-            let file_group = FileGroup::new_with_base_file_name(
-                file_id.to_string(),
-                partition.clone(),
-                file_name,
-            )?;
+            let mut file_group = FileGroup::new(file_id.to_string(), 
partition.clone());
+
+            if let Some(base_file_name) = stat.get("baseFile") {
+                let base_file_name = base_file_name
+                    .as_str()
+                    .ok_or_else(|| CoreError::CommitMetadata("Invalid base 
file name".into()))?;
+                file_group.add_base_file_from_name(base_file_name)?;
+
+                if let Some(log_file_names) = stat.get("logFiles") {
+                    let log_file_names = 
log_file_names.as_array().ok_or_else(|| {
+                        CoreError::CommitMetadata("Invalid log files 
array".into())
+                    })?;
+                    for log_file_name in log_file_names {
+                        let log_file_name = 
log_file_name.as_str().ok_or_else(|| {
+                            CoreError::CommitMetadata("Invalid log file 
name".into())
+                        })?;
+                        file_group.add_log_file_from_name(log_file_name)?;
+                    }
+                } else {
+                    return Err(CoreError::CommitMetadata(
+                        "Missing log files in write stats".into(),
+                    ));
+                }
+            } else {
+                let path = stat.get("path").and_then(|v| 
v.as_str()).ok_or_else(|| {
+                    CoreError::CommitMetadata("Invalid path in write 
stats".into())
+                })?;
+
+                let file_name = Path::new(path)
+                    .file_name()
+                    .and_then(|name| name.to_str())
+                    .ok_or_else(|| CoreError::CommitMetadata("Invalid file 
name in path".into()))?;
+
+                file_group.add_base_file_from_name(file_name)?;
+            }
+
             file_groups.insert(file_group);
         }
     }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 31e25e0..7f5a82c 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -326,7 +326,7 @@ impl Table {
         end_timestamp: Option<&str>,
     ) -> Result<Vec<RecordBatch>> {
         // If the end timestamp is not provided, use the latest commit 
timestamp.
-        let Some(as_of_timestamp) =
+        let Some(end_timestamp) =
             end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp())
         else {
             return Ok(Vec::new());
@@ -336,10 +336,10 @@ impl Table {
         let mut file_slices: Vec<FileSlice> = Vec::new();
         let file_groups = self
             .timeline
-            .get_incremental_file_groups(Some(start_timestamp), 
Some(as_of_timestamp))
+            .get_incremental_file_groups(Some(start_timestamp), 
Some(end_timestamp))
             .await?;
         for file_group in file_groups {
-            if let Some(file_slice) = 
file_group.get_file_slice_as_of(as_of_timestamp) {
+            if let Some(file_slice) = 
file_group.get_file_slice_as_of(end_timestamp) {
                 file_slices.push(file_slice.clone());
             }
         }
@@ -347,13 +347,14 @@ impl Table {
         // Read incremental records from the file slices.
         let filters = &[
             
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
-            
FilterField::new(MetaField::CommitTime.as_ref()).lte(as_of_timestamp),
+            
FilterField::new(MetaField::CommitTime.as_ref()).lte(end_timestamp),
         ];
         let fg_reader =
             self.create_file_group_reader_with_filters(filters, 
MetaField::schema().as_ref())?;
         let base_file_only = self.get_table_type() == 
TableTypeValue::CopyOnWrite;
         let timezone = self.timezone();
-        let instant_range = InstantRange::up_to(as_of_timestamp, &timezone);
+        let instant_range =
+            InstantRange::within_open_closed(start_timestamp, end_timestamp, 
&timezone);
         let batches = futures::future::try_join_all(
             file_slices
                 .iter()
@@ -992,93 +993,95 @@ mod tests {
 
     mod test_incremental_queries {
         use super::super::*;
-        use arrow_array::{Array, StringArray};
+        use arrow_select::concat::concat_batches;
         use hudi_tests::SampleTable;
-        use std::collections::HashSet;
 
         #[tokio::test]
         async fn test_empty() -> Result<()> {
-            let base_url = SampleTable::V6Empty.url_to_cow();
-            let hudi_table = Table::new(base_url.path()).await?;
-
-            let records = hudi_table.read_incremental_records("0", 
None).await?;
-            assert!(records.is_empty());
-
+            for base_url in SampleTable::V6Empty.urls() {
+                let hudi_table = Table::new(base_url.path()).await?;
+                let records = hudi_table.read_incremental_records("0", 
None).await?;
+                assert!(records.is_empty())
+            }
             Ok(())
         }
 
         #[tokio::test]
         async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> 
{
-            let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
-            let hudi_table = Table::new(base_url.path()).await?;
-
-            // read records changed from the first commit (exclusive) to the 
second commit (inclusive)
-            let records = hudi_table
-                .read_incremental_records("20240707001301554", 
Some("20240707001302376"))
-                .await?;
-            assert_eq!(records.len(), 2);
-            assert_eq!(records[0].num_rows(), 1);
-            assert_eq!(records[1].num_rows(), 1);
-
-            // verify the partition paths
-            let partition_paths = StringArray::from(
-                arrow::compute::concat(&[
-                    
records[0].column_by_name("_hoodie_partition_path").unwrap(),
-                    
records[1].column_by_name("_hoodie_partition_path").unwrap(),
-                ])?
-                .to_data(),
-            );
-            let actual_partition_paths =
-                HashSet::<&str>::from_iter(partition_paths.iter().map(|s| 
s.unwrap()));
-            let expected_partition_paths = HashSet::from_iter(vec!["10", 
"30"]);
-            assert_eq!(actual_partition_paths, expected_partition_paths);
-
-            // verify the file names
-            let file_names = StringArray::from(
-                arrow::compute::concat(&[
-                    records[0].column_by_name("_hoodie_file_name").unwrap(),
-                    records[1].column_by_name("_hoodie_file_name").unwrap(),
-                ])?
-                .to_data(),
-            );
-            let actual_file_names =
-                HashSet::<&str>::from_iter(file_names.iter().map(|s| 
s.unwrap()));
-            let expected_file_names = HashSet::from_iter(vec![
-                
"d398fae1-c0e6-4098-8124-f55f7098bdba-0_1-95-136_20240707001302376.parquet",
-                
"4f2685a3-614f-49ca-9b2b-e1cb9fb61f27-0_0-95-135_20240707001302376.parquet",
-            ]);
-            assert_eq!(actual_file_names, expected_file_names);
-
-            // read records changed from the first commit (exclusive) to
-            // the latest (an insert overwrite table's replacecommit)
-            let records = hudi_table
-                .read_incremental_records("20240707001301554", None)
-                .await?;
-            assert_eq!(records.len(), 1);
-            assert_eq!(records[0].num_rows(), 1);
-
-            // verify the partition paths
-            let actual_partition_paths = StringArray::from(
-                records[0]
-                    .column_by_name("_hoodie_partition_path")
-                    .unwrap()
-                    .to_data(),
-            );
-            let expected_partition_paths = StringArray::from(vec!["30"]);
-            assert_eq!(actual_partition_paths, expected_partition_paths);
-
-            // verify the file names
-            let actual_file_names = StringArray::from(
-                records[0]
-                    .column_by_name("_hoodie_file_name")
-                    .unwrap()
-                    .to_data(),
-            );
-            let expected_file_names = StringArray::from(vec![
-                
"ebcb261d-62d3-4895-90ec-5b3c9622dff4-0_0-111-154_20240707001303088.parquet",
-            ]);
-            assert_eq!(actual_file_names, expected_file_names);
+            for base_url in 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() {
+                let hudi_table = Table::new(base_url.path()).await?;
+                let commit_timestamps = hudi_table
+                    .timeline
+                    .completed_commits
+                    .iter()
+                    .map(|i| i.timestamp.as_str())
+                    .collect::<Vec<_>>();
+                assert_eq!(commit_timestamps.len(), 3);
+                let first_commit = commit_timestamps[0];
+                let second_commit = commit_timestamps[1];
+                let third_commit = commit_timestamps[2];
+
+                // read records changed from the beginning to the 1st commit
+                let records = hudi_table
+                    .read_incremental_records("19700101000000", 
Some(first_commit))
+                    .await?;
+                let schema = &records[0].schema();
+                let records = concat_batches(schema, &records)?;
+                let sample_data = 
SampleTable::sample_data_order_by_id(&records);
+                assert_eq!(
+                    sample_data,
+                    vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol", 
true),],
+                    "Should return 3 records inserted in the 1st commit"
+                );
 
+                // read records changed from the 1st to the 2nd commit
+                let records = hudi_table
+                    .read_incremental_records(first_commit, 
Some(second_commit))
+                    .await?;
+                let schema = &records[0].schema();
+                let records = concat_batches(schema, &records)?;
+                let sample_data = 
SampleTable::sample_data_order_by_id(&records);
+                assert_eq!(
+                    sample_data,
+                    vec![(1, "Alice", false), (4, "Diana", true),],
+                    "Should return 2 records inserted or updated in the 2nd 
commit"
+                );
+
+                // read records changed from the 2nd to the 3rd commit
+                let records = hudi_table
+                    .read_incremental_records(second_commit, 
Some(third_commit))
+                    .await?;
+                let schema = &records[0].schema();
+                let records = concat_batches(schema, &records)?;
+                let sample_data = 
SampleTable::sample_data_order_by_id(&records);
+                assert_eq!(
+                    sample_data,
+                    vec![(4, "Diana", false),],
+                    "Should return 1 record insert-overwritten in the 3rd 
commit"
+                );
+
+                // read records changed from the 1st commit
+                let records = hudi_table
+                    .read_incremental_records(first_commit, None)
+                    .await?;
+                let schema = &records[0].schema();
+                let records = concat_batches(schema, &records)?;
+                let sample_data = 
SampleTable::sample_data_order_by_id(&records);
+                assert_eq!(
+                    sample_data,
+                    vec![(4, "Diana", false),],
+                    "Should return 1 record insert-overwritten in the 3rd 
commit"
+                );
+
+                // read records changed from the 3rd commit
+                let records = hudi_table
+                    .read_incremental_records(third_commit, None)
+                    .await?;
+                assert!(
+                    records.is_empty(),
+                    "Should return 0 record as it's the latest commit"
+                );
+            }
             Ok(())
         }
     }
diff --git a/crates/core/src/timeline/selector.rs 
b/crates/core/src/timeline/selector.rs
index ff117a7..3799f64 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -25,6 +25,7 @@ use crate::Result;
 use chrono::{DateTime, Utc};
 use std::sync::Arc;
 
+#[allow(dead_code)]
 #[derive(Debug, Clone)]
 pub struct InstantRange {
     timezone: String,
@@ -51,7 +52,7 @@ impl InstantRange {
         }
     }
 
-    /// Create a new [InstantRange] with an end timestamp inclusive.
+    /// Create a new [InstantRange] with a closed end timestamp range.
     pub fn up_to(end_timestamp: &str, timezone: &str) -> Self {
         Self::new(
             timezone.to_string(),
@@ -62,6 +63,28 @@ impl InstantRange {
         )
     }
 
+    /// Create a new [InstantRange] with an open timestamp range.
+    pub fn within(start_timestamp: &str, end_timestamp: &str, timezone: &str) 
-> Self {
+        Self::new(
+            timezone.to_string(),
+            Some(start_timestamp.to_string()),
+            Some(end_timestamp.to_string()),
+            false,
+            false,
+        )
+    }
+
+    /// Create a new [InstantRange] with an open start and closed end 
timestamp range.
+    pub fn within_open_closed(start_timestamp: &str, end_timestamp: &str, 
timezone: &str) -> Self {
+        Self::new(
+            timezone.to_string(),
+            Some(start_timestamp.to_string()),
+            Some(end_timestamp.to_string()),
+            false,
+            true,
+        )
+    }
+
     pub fn timezone(&self) -> &str {
         &self.timezone
     }
@@ -304,6 +327,29 @@ mod tests {
         assert!(range.end_inclusive);
     }
 
+    #[test]
+    fn test_within() {
+        let range = InstantRange::within("20240101000000000", 
"20241231235959999", "UTC");
+
+        assert_eq!(range.timezone(), "UTC");
+        assert_eq!(range.start_timestamp.as_deref(), 
Some("20240101000000000"));
+        assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
+        assert!(!range.start_inclusive);
+        assert!(!range.end_inclusive);
+    }
+
+    #[test]
+    fn test_within_open_closed() {
+        let range =
+            InstantRange::within_open_closed("20240101000000000", 
"20241231235959999", "UTC");
+
+        assert_eq!(range.timezone(), "UTC");
+        assert_eq!(range.start_timestamp.as_deref(), 
Some("20240101000000000"));
+        assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
+        assert!(!range.start_inclusive);
+        assert!(range.end_inclusive);
+    }
+
     #[test]
     fn test_is_in_range_inclusive_bounds() {
         let range = InstantRange::new(
diff --git 
a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql 
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql
new file mode 100644
index 0000000..d86e5df
--- /dev/null
+++ 
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.sql
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v6_simplekeygen_nonhivestyle_overwritetable (
+                                              id INT,
+                                              name STRING,
+                                              isActive BOOLEAN,
+                                              shortField SHORT,
+                                              intField INT,
+                                              longField LONG,
+                                              floatField FLOAT,
+                                              doubleField DOUBLE,
+                                              decimalField DECIMAL(10,5),
+                                              dateField DATE,
+                                              timestampField TIMESTAMP,
+                                              binaryField BINARY,
+                                              arrayField 
ARRAY<STRUCT<arr_struct_f1: STRING, arr_struct_f2: INT>>,  -- Array of structs
+                                              mapField MAP<STRING, 
STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>, 
 -- Map with struct values
+                                              structField STRUCT<
+                                                  field1: STRING,
+                                              field2: INT,
+                                              child_struct: STRUCT<
+                                                  child_field1: DOUBLE,
+                                              child_field2: BOOLEAN
+                                                  >
+                                                  >,
+                                              byteField BYTE
+)
+    USING HUDI
+    LOCATION 
'/opt/data/external_tables/v6_simplekeygen_nonhivestyle_overwritetable'
+TBLPROPERTIES (
+    type = 'mor',
+    primaryKey = 'id',
+    preCombineField = 'longField',
+    'hoodie.metadata.enable' = 'false',
+    'hoodie.datasource.write.hive_style_partitioning' = 'false',
+    'hoodie.datasource.write.drop.partition.columns' = 'false',
+    'hoodie.table.log.file.format' = 'PARQUET',
+    'hoodie.logfile.data.block.format' = 'parquet',
+    'hoodie.datasource.write.record.merger.impls' = 
'org.apache.hudi.HoodieSparkRecordMerger',
+    'hoodie.parquet.small.file.limit' = '0'
+)
+PARTITIONED BY (byteField);
+
+INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
+                                             (1, 'Alice', true, 300, 15000, 
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), 
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+                                              ARRAY(STRUCT('red', 100), 
STRUCT('blue', 200), STRUCT('green', 300)),
+                                              MAP('key1', STRUCT(123.456, 
true), 'key2', STRUCT(789.012, false)),
+                                              STRUCT('Alice', 30, 
STRUCT(123.456, true)),
+                                              10
+                                             ),
+                                             (2, 'Bob', false, 100, 25000, 
9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE), 
CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+                                              ARRAY(STRUCT('yellow', 400), 
STRUCT('purple', 500)),
+                                              MAP('key3', STRUCT(234.567, 
true), 'key4', STRUCT(567.890, false)),
+                                              STRUCT('Bob', 40, 
STRUCT(789.012, false)),
+                                              20
+                                             ),
+                                             (3, 'Carol', true, 200, 35000, 
1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE), 
CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS 
BINARY),
+                                              ARRAY(STRUCT('black', 600), 
STRUCT('white', 700), STRUCT('pink', 800)),
+                                              MAP('key5', STRUCT(345.678, 
true), 'key6', STRUCT(654.321, false)),
+                                              STRUCT('Carol', 25, 
STRUCT(456.789, true)),
+                                              10
+                                             );
+
+INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
+                                             (1, 'Alice', false, 300, 15000, 
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), 
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+                                              ARRAY(STRUCT('red', 100), 
STRUCT('blue', 200), STRUCT('green', 300)),
+                                              MAP('key1', STRUCT(123.456, 
true), 'key2', STRUCT(789.012, false)),
+                                              STRUCT('Alice', 30, 
STRUCT(123.456, true)),
+                                              10
+                                             ),
+                                             (4, 'Diana', true, 500, 45000, 
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), 
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+                                              ARRAY(STRUCT('orange', 900), 
STRUCT('gray', 1000)),
+                                              MAP('key7', STRUCT(456.789, 
true), 'key8', STRUCT(123.456, false)),
+                                              STRUCT('Diana', 50, 
STRUCT(987.654, true)),
+                                              30
+                                             );
+
+INSERT OVERWRITE TABLE v6_simplekeygen_nonhivestyle_overwritetable SELECT
+                                              4, 'Diana', false, 500, 45000, 
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), 
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+                                              ARRAY(STRUCT('orange', 900), 
STRUCT('gray', 1000)),
+                                              MAP('key7', STRUCT(456.789, 
true), 'key8', STRUCT(123.456, false)),
+                                              STRUCT('Diana', 50, 
STRUCT(987.654, true)),
+                                              30
+                                             ;
diff --git 
a/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.zip 
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.zip
new file mode 100644
index 0000000..afd3208
Binary files /dev/null and 
b/crates/tests/data/tables/mor/v6_simplekeygen_nonhivestyle_overwritetable.zip 
differ

Reply via email to