This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1c66e5a refactor: cache partition_schema in `fn plan_files()` (#362)
1c66e5a is described below
commit 1c66e5a6f5f5382cac12c561ade8147cb8d6c7cd
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Thu May 2 05:51:51 2024 +0200
refactor: cache partition_schema in `fn plan_files()` (#362)
* refactor: add partition_schema_cache
* refactor: use context as param object
* fix: test setup
* refactor: clone only when cache miss
* chore: move derive stmts
* refactor: remove unused case_sensitive parameter
* refactor: remove partition_schema_cache
* refactor: move partition_filter into wider scope
---
.../src/expr/visitors/manifest_evaluator.rs | 14 +--
crates/iceberg/src/scan.rs | 121 +++++++++------------
2 files changed, 54 insertions(+), 81 deletions(-)
diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
index bcb5967..fd2ebdd 100644
--- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs
@@ -21,23 +21,19 @@ use crate::spec::{Datum, FieldSummary, ManifestFile};
use crate::Result;
use fnv::FnvHashSet;
-#[derive(Debug)]
/// Evaluates a [`ManifestFile`] to see if the partition summaries
/// match a provided [`BoundPredicate`].
///
/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
/// in which data might be found that matches the TableScan's filter.
+#[derive(Debug)]
pub(crate) struct ManifestEvaluator {
partition_filter: BoundPredicate,
- case_sensitive: bool,
}
impl ManifestEvaluator {
- pub(crate) fn new(partition_filter: BoundPredicate, case_sensitive: bool)
-> Self {
- Self {
- partition_filter,
- case_sensitive,
- }
+ pub(crate) fn new(partition_filter: BoundPredicate) -> Self {
+ Self { partition_filter }
}
/// Evaluate this `ManifestEvaluator`'s filter predicate against the
@@ -310,7 +306,7 @@ mod test {
fn create_partition_schema(
partition_spec: &PartitionSpecRef,
- schema: &SchemaRef,
+ schema: &Schema,
) -> Result<SchemaRef> {
let partition_type = partition_spec.partition_type(schema)?;
@@ -356,7 +352,7 @@ mod test {
case_sensitive,
)?;
- Ok(ManifestEvaluator::new(partition_filter, case_sensitive))
+ Ok(ManifestEvaluator::new(partition_filter))
}
#[test]
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index b842522..c2a5e1b 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -23,8 +23,8 @@ use
crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::FileIO;
use crate::spec::{
- DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile,
PartitionSpecRef, Schema,
- SchemaRef, SnapshotRef, TableMetadataRef,
+ DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile,
Schema, SchemaRef,
+ SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
@@ -203,31 +203,22 @@ impl TableScan {
continue;
}
- if let Some(filter) = context.bound_filter() {
- let partition_spec_id = entry.partition_spec_id;
+ let partition_spec_id = entry.partition_spec_id;
- let (partition_spec, partition_schema) =
-
context.create_partition_spec_and_schema(partition_spec_id)?;
-
- let partition_filter = partition_filter_cache.get(
- partition_spec_id,
- partition_spec,
- partition_schema.clone(),
- filter,
- context.case_sensitive,
- )?;
+ let partition_filter = partition_filter_cache.get(
+ partition_spec_id,
+ &context,
+ )?;
+ if let Some(partition_filter) = partition_filter {
let manifest_evaluator = manifest_evaluator_cache.get(
- partition_schema.schema_id(),
- partition_filter.clone(),
- context.case_sensitive,
+ partition_spec_id,
+ partition_filter,
);
if !manifest_evaluator.eval(entry)? {
continue;
}
-
- // TODO: Create ExpressionEvaluator
}
let manifest = entry.load_manifest(&context.file_io).await?;
@@ -321,9 +312,9 @@ impl TableScan {
}
}
-#[derive(Debug)]
/// Holds the context necessary for file scanning operations
/// in a streaming environment.
+#[derive(Debug)]
struct FileScanStreamContext {
schema: SchemaRef,
snapshot: SnapshotRef,
@@ -362,37 +353,11 @@ impl FileScanStreamContext {
fn bound_filter(&self) -> Option<&BoundPredicate> {
self.bound_filter.as_ref()
}
-
- /// Creates a reference-counted [`PartitionSpec`] and a
- /// corresponding [`Schema`] based on the specified partition spec id.
- fn create_partition_spec_and_schema(
- &self,
- spec_id: i32,
- ) -> Result<(PartitionSpecRef, SchemaRef)> {
- let partition_spec =
- self.table_metadata
- .partition_spec_by_id(spec_id)
- .ok_or(Error::new(
- ErrorKind::Unexpected,
- format!("Could not find partition spec for id {}",
spec_id),
- ))?;
-
- let partition_type = partition_spec.partition_type(&self.schema)?;
- let partition_fields = partition_type.fields().to_owned();
- let partition_schema = Arc::new(
- Schema::builder()
- .with_schema_id(partition_spec.spec_id)
- .with_fields(partition_fields)
- .build()?,
- );
-
- Ok((partition_spec.clone(), partition_schema))
- }
}
-#[derive(Debug)]
/// Manages the caching of [`BoundPredicate`] objects
/// for [`PartitionSpec`]s based on partition spec id.
+#[derive(Debug)]
struct PartitionFilterCache(HashMap<i32, BoundPredicate>);
impl PartitionFilterCache {
@@ -407,30 +372,47 @@ impl PartitionFilterCache {
fn get(
&mut self,
spec_id: i32,
- partition_spec: PartitionSpecRef,
- partition_schema: SchemaRef,
- filter: &BoundPredicate,
- case_sensitive: bool,
- ) -> Result<&BoundPredicate> {
- match self.0.entry(spec_id) {
- Entry::Occupied(e) => Ok(e.into_mut()),
- Entry::Vacant(e) => {
- let mut inclusive_projection =
InclusiveProjection::new(partition_spec);
-
- let partition_filter = inclusive_projection
- .project(filter)?
- .rewrite_not()
- .bind(partition_schema, case_sensitive)?;
-
- Ok(e.insert(partition_filter))
- }
+ context: &FileScanStreamContext,
+ ) -> Result<Option<&BoundPredicate>> {
+ match context.bound_filter() {
+ None => Ok(None),
+ Some(filter) => match self.0.entry(spec_id) {
+ Entry::Occupied(e) => Ok(Some(e.into_mut())),
+ Entry::Vacant(e) => {
+ let partition_spec = context
+ .table_metadata
+ .partition_spec_by_id(spec_id)
+ .ok_or(Error::new(
+ ErrorKind::Unexpected,
+ format!("Could not find partition spec for id {}",
spec_id),
+ ))?;
+
+ let partition_type =
partition_spec.partition_type(context.schema.as_ref())?;
+ let partition_fields = partition_type.fields().to_owned();
+ let partition_schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(partition_spec.spec_id)
+ .with_fields(partition_fields)
+ .build()?,
+ );
+
+ let mut inclusive_projection =
InclusiveProjection::new(partition_spec.clone());
+
+ let partition_filter = inclusive_projection
+ .project(filter)?
+ .rewrite_not()
+ .bind(partition_schema.clone(),
context.case_sensitive)?;
+
+ Ok(Some(e.insert(partition_filter)))
+ }
+ },
}
}
}
-#[derive(Debug)]
/// Manages the caching of [`ManifestEvaluator`] objects
/// for [`PartitionSpec`]s based on partition spec id.
+#[derive(Debug)]
struct ManifestEvaluatorCache(HashMap<i32, ManifestEvaluator>);
impl ManifestEvaluatorCache {
@@ -442,15 +424,10 @@ impl ManifestEvaluatorCache {
/// Retrieves a [`ManifestEvaluator`] from the cache
/// or computes it if not present.
- fn get(
- &mut self,
- spec_id: i32,
- partition_filter: BoundPredicate,
- case_sensitive: bool,
- ) -> &mut ManifestEvaluator {
+ fn get(&mut self, spec_id: i32, partition_filter: &BoundPredicate) -> &mut
ManifestEvaluator {
self.0
.entry(spec_id)
- .or_insert(ManifestEvaluator::new(partition_filter,
case_sensitive))
+ .or_insert(ManifestEvaluator::new(partition_filter.clone()))
}
}