This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c47f52a refactor: use static MetaField schema for incr query (#252)
c47f52a is described below
commit c47f52a350e98c814dc483e91e3798f9948c70d8
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 19 23:26:00 2025 -0600
refactor: use static MetaField schema for incr query (#252)
---
Cargo.toml | 1 +
crates/core/Cargo.toml | 1 +
crates/core/src/metadata/meta_field.rs | 59 ++++++++++++++++++++++++++++++++++
crates/core/src/table/mod.rs | 14 ++++----
4 files changed, 69 insertions(+), 6 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 976efea..ab764ec 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -64,6 +64,7 @@ serde_json = { version = "1.0" }
thiserror = { version = "2.0.10" }
bytes = { version = "1" }
chrono = { version = "0.4" }
+lazy_static = { version = "1.5.0" }
log = { version = "0.4" }
paste = { version = "1.0.15" }
strum = { version = "0.26", features = ["derive"] }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index fe8c79d..b835b43 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -51,6 +51,7 @@ serde_json = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
+lazy_static = { workspace = true }
log = { workspace = true }
paste = { workspace = true }
strum = { workspace = true }
diff --git a/crates/core/src/metadata/meta_field.rs
b/crates/core/src/metadata/meta_field.rs
index 240aeff..7340ebc 100644
--- a/crates/core/src/metadata/meta_field.rs
+++ b/crates/core/src/metadata/meta_field.rs
@@ -18,8 +18,11 @@
*/
use crate::error::CoreError;
use crate::Result;
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use lazy_static::lazy_static;
use std::fmt::Display;
use std::str::FromStr;
+use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MetaField {
@@ -66,11 +69,37 @@ impl FromStr for MetaField {
}
}
+lazy_static! {
+ static ref SCHEMA: Arc<Schema> = Arc::new(Schema::new(vec![
+ Field::new(MetaField::CommitTime.as_ref(), DataType::Utf8, false),
+ Field::new(MetaField::CommitSeqno.as_ref(), DataType::Utf8, false),
+ Field::new(MetaField::RecordKey.as_ref(), DataType::Utf8, false),
+ Field::new(MetaField::PartitionPath.as_ref(), DataType::Utf8, false),
+ Field::new(MetaField::FileName.as_ref(), DataType::Utf8, false),
+ ]));
+ static ref SCHEMA_WITH_OPERATION: Arc<Schema> = Arc::new(Schema::new(vec![
+ Field::new(MetaField::CommitTime.as_ref(), DataType::Utf8, false),
+ Field::new(MetaField::CommitSeqno.as_ref(), DataType::Utf8, false),
+ Field::new(MetaField::RecordKey.as_ref(), DataType::Utf8, false),
+ Field::new(MetaField::PartitionPath.as_ref(), DataType::Utf8, false),
+ Field::new(MetaField::FileName.as_ref(), DataType::Utf8, false),
+ Field::new(MetaField::Operation.as_ref(), DataType::Utf8, false),
+ ]));
+}
+
impl MetaField {
#[inline]
pub fn field_index(&self) -> usize {
self.clone() as usize
}
+
+ pub fn schema() -> SchemaRef {
+ SCHEMA.clone()
+ }
+
+ pub fn schema_with_operation() -> SchemaRef {
+ SCHEMA_WITH_OPERATION.clone()
+ }
}
#[cfg(test)]
@@ -171,4 +200,34 @@ mod tests {
}
Ok(())
}
+
+ #[test]
+ fn test_get_schema() {
+ assert_eq!(MetaField::schema().fields.len(), 5);
+ assert_eq!(
+ MetaField::schema().fields[0].name(),
+ MetaField::CommitTime.as_ref()
+ );
+ assert_eq!(
+ MetaField::schema().fields[1].name(),
+ MetaField::CommitSeqno.as_ref()
+ );
+ assert_eq!(
+ MetaField::schema().fields[2].name(),
+ MetaField::RecordKey.as_ref()
+ );
+ assert_eq!(
+ MetaField::schema().fields[3].name(),
+ MetaField::PartitionPath.as_ref()
+ );
+ assert_eq!(
+ MetaField::schema().fields[4].name(),
+ MetaField::FileName.as_ref()
+ );
+ assert_eq!(MetaField::schema_with_operation().fields.len(), 6);
+ assert_eq!(
+ MetaField::schema_with_operation().fields[5].name(),
+ MetaField::Operation.as_ref()
+ );
+ }
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index c1db54d..4c92faf 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -104,6 +104,7 @@ use crate::table::partition::PartitionPruner;
use crate::timeline::Timeline;
use crate::Result;
+use crate::metadata::meta_field::MetaField;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use std::collections::{HashMap, HashSet};
@@ -263,16 +264,16 @@ impl Table {
)
}
- pub async fn create_file_group_reader_with_filters(
+ pub fn create_file_group_reader_with_filters(
&self,
filters: &[Filter],
+ schema: &Schema,
) -> Result<FileGroupReader> {
- let schema = self.get_schema().await?;
FileGroupReader::new_with_filters(
self.file_system_view.storage.clone(),
self.hudi_configs.clone(),
filters,
- &schema,
+ schema,
)
}
@@ -335,10 +336,11 @@ impl Table {
// Read incremental records from the file slices.
let filters = &[
- FilterField::new("_hoodie_commit_time").gt(start_timestamp),
- FilterField::new("_hoodie_commit_time").lte(as_of_timestamp),
+
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
+
FilterField::new(MetaField::CommitTime.as_ref()).lte(as_of_timestamp),
];
- let fg_reader =
self.create_file_group_reader_with_filters(filters).await?;
+ let fg_reader =
+ self.create_file_group_reader_with_filters(filters,
MetaField::schema().as_ref())?;
let base_file_only = self.get_table_type() ==
TableTypeValue::CopyOnWrite;
let batches = futures::future::try_join_all(
file_slices