This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c66e5a  refactor: cache partition_schema in `fn plan_files()` (#362)
1c66e5a is described below

commit 1c66e5a6f5f5382cac12c561ade8147cb8d6c7cd
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Thu May 2 05:51:51 2024 +0200

    refactor: cache partition_schema in `fn plan_files()` (#362)
    
    * refactor: add partition_schema_cache
    
    * refactor: use context as param object
    
    * fix: test setup
    
    * refactor: clone only when cache miss
    
    * chore: move derive stmts
    
    * refactor: remove unused case_sensitive parameter
    
    * refactor: remove partition_schema_cache
    
    * refactor: move partition_filter into wider scope
---
 .../src/expr/visitors/manifest_evaluator.rs        |  14 +--
 crates/iceberg/src/scan.rs                         | 121 +++++++++------------
 2 files changed, 54 insertions(+), 81 deletions(-)

diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs 
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
index bcb5967..fd2ebdd 100644
--- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -21,23 +21,19 @@ use crate::spec::{Datum, FieldSummary, ManifestFile};
 use crate::Result;
 use fnv::FnvHashSet;
 
-#[derive(Debug)]
 /// Evaluates a [`ManifestFile`] to see if the partition summaries
 /// match a provided [`BoundPredicate`].
 ///
 /// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
 /// in which data might be found that matches the TableScan's filter.
+#[derive(Debug)]
 pub(crate) struct ManifestEvaluator {
     partition_filter: BoundPredicate,
-    case_sensitive: bool,
 }
 
 impl ManifestEvaluator {
-    pub(crate) fn new(partition_filter: BoundPredicate, case_sensitive: bool) 
-> Self {
-        Self {
-            partition_filter,
-            case_sensitive,
-        }
+    pub(crate) fn new(partition_filter: BoundPredicate) -> Self {
+        Self { partition_filter }
     }
 
     /// Evaluate this `ManifestEvaluator`'s filter predicate against the
@@ -310,7 +306,7 @@ mod test {
 
     fn create_partition_schema(
         partition_spec: &PartitionSpecRef,
-        schema: &SchemaRef,
+        schema: &Schema,
     ) -> Result<SchemaRef> {
         let partition_type = partition_spec.partition_type(schema)?;
 
@@ -356,7 +352,7 @@ mod test {
             case_sensitive,
         )?;
 
-        Ok(ManifestEvaluator::new(partition_filter, case_sensitive))
+        Ok(ManifestEvaluator::new(partition_filter))
     }
 
     #[test]
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index b842522..c2a5e1b 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -23,8 +23,8 @@ use 
crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
 use crate::expr::{Bind, BoundPredicate, Predicate};
 use crate::io::FileIO;
 use crate::spec::{
-    DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, 
PartitionSpecRef, Schema,
-    SchemaRef, SnapshotRef, TableMetadataRef,
+    DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, 
Schema, SchemaRef,
+    SnapshotRef, TableMetadataRef,
 };
 use crate::table::Table;
 use crate::{Error, ErrorKind, Result};
@@ -203,31 +203,22 @@ impl TableScan {
                     continue;
                 }
 
-                if let Some(filter) = context.bound_filter() {
-                    let partition_spec_id = entry.partition_spec_id;
+                let partition_spec_id = entry.partition_spec_id;
 
-                    let (partition_spec, partition_schema) =
-                        
context.create_partition_spec_and_schema(partition_spec_id)?;
-
-                    let partition_filter = partition_filter_cache.get(
-                        partition_spec_id,
-                        partition_spec,
-                        partition_schema.clone(),
-                        filter,
-                        context.case_sensitive,
-                    )?;
+                let partition_filter = partition_filter_cache.get(
+                    partition_spec_id,
+                    &context,
+                )?;
 
+                if let Some(partition_filter) = partition_filter {
                     let manifest_evaluator = manifest_evaluator_cache.get(
-                        partition_schema.schema_id(),
-                        partition_filter.clone(),
-                        context.case_sensitive,
+                        partition_spec_id,
+                        partition_filter,
                     );
 
                     if !manifest_evaluator.eval(entry)? {
                         continue;
                     }
-
-                    // TODO: Create ExpressionEvaluator
                 }
 
                 let manifest = entry.load_manifest(&context.file_io).await?;
@@ -321,9 +312,9 @@ impl TableScan {
     }
 }
 
-#[derive(Debug)]
 /// Holds the context necessary for file scanning operations
 /// in a streaming environment.
+#[derive(Debug)]
 struct FileScanStreamContext {
     schema: SchemaRef,
     snapshot: SnapshotRef,
@@ -362,37 +353,11 @@ impl FileScanStreamContext {
     fn bound_filter(&self) -> Option<&BoundPredicate> {
         self.bound_filter.as_ref()
     }
-
-    /// Creates a reference-counted [`PartitionSpec`] and a
-    /// corresponding [`Schema`] based on the specified partition spec id.
-    fn create_partition_spec_and_schema(
-        &self,
-        spec_id: i32,
-    ) -> Result<(PartitionSpecRef, SchemaRef)> {
-        let partition_spec =
-            self.table_metadata
-                .partition_spec_by_id(spec_id)
-                .ok_or(Error::new(
-                    ErrorKind::Unexpected,
-                    format!("Could not find partition spec for id {}", 
spec_id),
-                ))?;
-
-        let partition_type = partition_spec.partition_type(&self.schema)?;
-        let partition_fields = partition_type.fields().to_owned();
-        let partition_schema = Arc::new(
-            Schema::builder()
-                .with_schema_id(partition_spec.spec_id)
-                .with_fields(partition_fields)
-                .build()?,
-        );
-
-        Ok((partition_spec.clone(), partition_schema))
-    }
 }
 
-#[derive(Debug)]
 /// Manages the caching of [`BoundPredicate`] objects
 /// for [`PartitionSpec`]s based on partition spec id.
+#[derive(Debug)]
 struct PartitionFilterCache(HashMap<i32, BoundPredicate>);
 
 impl PartitionFilterCache {
@@ -407,30 +372,47 @@ impl PartitionFilterCache {
     fn get(
         &mut self,
         spec_id: i32,
-        partition_spec: PartitionSpecRef,
-        partition_schema: SchemaRef,
-        filter: &BoundPredicate,
-        case_sensitive: bool,
-    ) -> Result<&BoundPredicate> {
-        match self.0.entry(spec_id) {
-            Entry::Occupied(e) => Ok(e.into_mut()),
-            Entry::Vacant(e) => {
-                let mut inclusive_projection = 
InclusiveProjection::new(partition_spec);
-
-                let partition_filter = inclusive_projection
-                    .project(filter)?
-                    .rewrite_not()
-                    .bind(partition_schema, case_sensitive)?;
-
-                Ok(e.insert(partition_filter))
-            }
+        context: &FileScanStreamContext,
+    ) -> Result<Option<&BoundPredicate>> {
+        match context.bound_filter() {
+            None => Ok(None),
+            Some(filter) => match self.0.entry(spec_id) {
+                Entry::Occupied(e) => Ok(Some(e.into_mut())),
+                Entry::Vacant(e) => {
+                    let partition_spec = context
+                        .table_metadata
+                        .partition_spec_by_id(spec_id)
+                        .ok_or(Error::new(
+                            ErrorKind::Unexpected,
+                            format!("Could not find partition spec for id {}", 
spec_id),
+                        ))?;
+
+                    let partition_type = 
partition_spec.partition_type(context.schema.as_ref())?;
+                    let partition_fields = partition_type.fields().to_owned();
+                    let partition_schema = Arc::new(
+                        Schema::builder()
+                            .with_schema_id(partition_spec.spec_id)
+                            .with_fields(partition_fields)
+                            .build()?,
+                    );
+
+                    let mut inclusive_projection = 
InclusiveProjection::new(partition_spec.clone());
+
+                    let partition_filter = inclusive_projection
+                        .project(filter)?
+                        .rewrite_not()
+                        .bind(partition_schema.clone(), 
context.case_sensitive)?;
+
+                    Ok(Some(e.insert(partition_filter)))
+                }
+            },
         }
     }
 }
 
-#[derive(Debug)]
 /// Manages the caching of [`ManifestEvaluator`] objects
 /// for [`PartitionSpec`]s based on partition spec id.
+#[derive(Debug)]
 struct ManifestEvaluatorCache(HashMap<i32, ManifestEvaluator>);
 
 impl ManifestEvaluatorCache {
@@ -442,15 +424,10 @@ impl ManifestEvaluatorCache {
 
     /// Retrieves a [`ManifestEvaluator`] from the cache
     /// or computes it if not present.
-    fn get(
-        &mut self,
-        spec_id: i32,
-        partition_filter: BoundPredicate,
-        case_sensitive: bool,
-    ) -> &mut ManifestEvaluator {
+    fn get(&mut self, spec_id: i32, partition_filter: &BoundPredicate) -> &mut 
ManifestEvaluator {
         self.0
             .entry(spec_id)
-            .or_insert(ManifestEvaluator::new(partition_filter, 
case_sensitive))
+            .or_insert(ManifestEvaluator::new(partition_filter.clone()))
     }
 }
 

Reply via email to