This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new c47f52a  refactor: use static MetaField schema for incr query (#252)
c47f52a is described below

commit c47f52a350e98c814dc483e91e3798f9948c70d8
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 19 23:26:00 2025 -0600

    refactor: use static MetaField schema for incr query (#252)
---
 Cargo.toml                             |  1 +
 crates/core/Cargo.toml                 |  1 +
 crates/core/src/metadata/meta_field.rs | 59 ++++++++++++++++++++++++++++++++++
 crates/core/src/table/mod.rs           | 14 ++++----
 4 files changed, 69 insertions(+), 6 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 976efea..ab764ec 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -64,6 +64,7 @@ serde_json = { version = "1.0" }
 thiserror = { version = "2.0.10" }
 bytes = { version = "1" }
 chrono = { version = "0.4" }
+lazy_static = { version = "1.5.0" }
 log = { version = "0.4" }
 paste = { version = "1.0.15" }
 strum = { version = "0.26", features = ["derive"] }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index fe8c79d..b835b43 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -51,6 +51,7 @@ serde_json = { workspace = true }
 thiserror = { workspace = true }
 bytes = { workspace = true }
 chrono = { workspace = true }
+lazy_static = { workspace = true }
 log = { workspace = true }
 paste = { workspace = true }
 strum = { workspace = true }
diff --git a/crates/core/src/metadata/meta_field.rs 
b/crates/core/src/metadata/meta_field.rs
index 240aeff..7340ebc 100644
--- a/crates/core/src/metadata/meta_field.rs
+++ b/crates/core/src/metadata/meta_field.rs
@@ -18,8 +18,11 @@
  */
 use crate::error::CoreError;
 use crate::Result;
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use lazy_static::lazy_static;
 use std::fmt::Display;
 use std::str::FromStr;
+use std::sync::Arc;
 
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum MetaField {
@@ -66,11 +69,37 @@ impl FromStr for MetaField {
     }
 }
 
+lazy_static! {
+    static ref SCHEMA: Arc<Schema> = Arc::new(Schema::new(vec![
+        Field::new(MetaField::CommitTime.as_ref(), DataType::Utf8, false),
+        Field::new(MetaField::CommitSeqno.as_ref(), DataType::Utf8, false),
+        Field::new(MetaField::RecordKey.as_ref(), DataType::Utf8, false),
+        Field::new(MetaField::PartitionPath.as_ref(), DataType::Utf8, false),
+        Field::new(MetaField::FileName.as_ref(), DataType::Utf8, false),
+    ]));
+    static ref SCHEMA_WITH_OPERATION: Arc<Schema> = Arc::new(Schema::new(vec![
+        Field::new(MetaField::CommitTime.as_ref(), DataType::Utf8, false),
+        Field::new(MetaField::CommitSeqno.as_ref(), DataType::Utf8, false),
+        Field::new(MetaField::RecordKey.as_ref(), DataType::Utf8, false),
+        Field::new(MetaField::PartitionPath.as_ref(), DataType::Utf8, false),
+        Field::new(MetaField::FileName.as_ref(), DataType::Utf8, false),
+        Field::new(MetaField::Operation.as_ref(), DataType::Utf8, false),
+    ]));
+}
+
 impl MetaField {
     #[inline]
     pub fn field_index(&self) -> usize {
         self.clone() as usize
     }
+
+    pub fn schema() -> SchemaRef {
+        SCHEMA.clone()
+    }
+
+    pub fn schema_with_operation() -> SchemaRef {
+        SCHEMA_WITH_OPERATION.clone()
+    }
 }
 
 #[cfg(test)]
@@ -171,4 +200,34 @@ mod tests {
         }
         Ok(())
     }
+
+    #[test]
+    fn test_get_schema() {
+        assert_eq!(MetaField::schema().fields.len(), 5);
+        assert_eq!(
+            MetaField::schema().fields[0].name(),
+            MetaField::CommitTime.as_ref()
+        );
+        assert_eq!(
+            MetaField::schema().fields[1].name(),
+            MetaField::CommitSeqno.as_ref()
+        );
+        assert_eq!(
+            MetaField::schema().fields[2].name(),
+            MetaField::RecordKey.as_ref()
+        );
+        assert_eq!(
+            MetaField::schema().fields[3].name(),
+            MetaField::PartitionPath.as_ref()
+        );
+        assert_eq!(
+            MetaField::schema().fields[4].name(),
+            MetaField::FileName.as_ref()
+        );
+        assert_eq!(MetaField::schema_with_operation().fields.len(), 6);
+        assert_eq!(
+            MetaField::schema_with_operation().fields[5].name(),
+            MetaField::Operation.as_ref()
+        );
+    }
 }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index c1db54d..4c92faf 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -104,6 +104,7 @@ use crate::table::partition::PartitionPruner;
 use crate::timeline::Timeline;
 use crate::Result;
 
+use crate::metadata::meta_field::MetaField;
 use arrow::record_batch::RecordBatch;
 use arrow_schema::{Field, Schema};
 use std::collections::{HashMap, HashSet};
@@ -263,16 +264,16 @@ impl Table {
         )
     }
 
-    pub async fn create_file_group_reader_with_filters(
+    pub fn create_file_group_reader_with_filters(
         &self,
         filters: &[Filter],
+        schema: &Schema,
     ) -> Result<FileGroupReader> {
-        let schema = self.get_schema().await?;
         FileGroupReader::new_with_filters(
             self.file_system_view.storage.clone(),
             self.hudi_configs.clone(),
             filters,
-            &schema,
+            schema,
         )
     }
 
@@ -335,10 +336,11 @@ impl Table {
 
         // Read incremental records from the file slices.
         let filters = &[
-            FilterField::new("_hoodie_commit_time").gt(start_timestamp),
-            FilterField::new("_hoodie_commit_time").lte(as_of_timestamp),
+            
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
+            
FilterField::new(MetaField::CommitTime.as_ref()).lte(as_of_timestamp),
         ];
-        let fg_reader = 
self.create_file_group_reader_with_filters(filters).await?;
+        let fg_reader =
+            self.create_file_group_reader_with_filters(filters, 
MetaField::schema().as_ref())?;
         let base_file_only = self.get_table_type() == 
TableTypeValue::CopyOnWrite;
         let batches = futures::future::try_join_all(
             file_slices

Reply via email to