This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0154829 feat: implement manifest filtering in TableScan (#323)
0154829 is described below
commit 0154829566d12236fd69fc313f2b51c2f8e82c4a
Author: Scott Donnelly <[email protected]>
AuthorDate: Fri Apr 26 06:36:27 2024 +0100
feat: implement manifest filtering in TableScan (#323)
---
crates/iceberg/src/scan.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 79 insertions(+), 3 deletions(-)
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index b96c470..36f71c1 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -18,14 +18,20 @@
//! Table scan api.
use crate::arrow::ArrowReaderBuilder;
+use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
+use crate::expr::{Bind, Predicate};
use crate::io::FileIO;
-use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef,
TableMetadataRef};
+use crate::spec::{
+ DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadata,
TableMetadataRef,
+};
use crate::table::Table;
use crate::{Error, ErrorKind};
use arrow_array::RecordBatch;
use async_stream::try_stream;
use futures::stream::{iter, BoxStream};
use futures::StreamExt;
+use std::collections::HashMap;
+use std::sync::Arc;
/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
@@ -34,6 +40,8 @@ pub struct TableScanBuilder<'a> {
column_names: Vec<String>,
snapshot_id: Option<i64>,
batch_size: Option<usize>,
+ case_sensitive: bool,
+ filter: Option<Predicate>,
}
impl<'a> TableScanBuilder<'a> {
@@ -43,6 +51,8 @@ impl<'a> TableScanBuilder<'a> {
column_names: vec![],
snapshot_id: None,
batch_size: None,
+ case_sensitive: true,
+ filter: None,
}
}
@@ -53,6 +63,20 @@ impl<'a> TableScanBuilder<'a> {
self
}
+ /// Sets the scan's case sensitivity
+ pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
+ self.case_sensitive = case_sensitive;
+ self
+ }
+
+ /// Specifies a predicate to use as a filter
+ pub fn with_filter(mut self, predicate: Predicate) -> Self {
+ // calls rewrite_not to remove Not nodes, which must be absent
+ // when applying the manifest evaluator
+ self.filter = Some(predicate.rewrite_not());
+ self
+ }
+
/// Select all columns.
pub fn select_all(mut self) -> Self {
self.column_names.clear();
@@ -125,6 +149,8 @@ impl<'a> TableScanBuilder<'a> {
column_names: self.column_names,
schema,
batch_size: self.batch_size,
+ case_sensitive: self.case_sensitive,
+ filter: self.filter.map(Arc::new),
})
}
}
@@ -139,6 +165,8 @@ pub struct TableScan {
column_names: Vec<String>,
schema: SchemaRef,
batch_size: Option<usize>,
+ case_sensitive: bool,
+ filter: Option<Arc<Predicate>>,
}
/// A stream of [`FileScanTask`].
@@ -146,10 +174,20 @@ pub type FileScanTaskStream = BoxStream<'static,
crate::Result<FileScanTask>>;
impl TableScan {
/// Returns a stream of file scan tasks.
+
pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
+ // Cache `ManifestEvaluatorFactory`s created as part of this scan
+ let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> =
HashMap::new();
+
+ // these variables needed to ensure that we don't need to pass a
+ // reference to self into `try_stream`, as it expects references
+ // passed in to outlive 'static
+ let schema = self.schema.clone();
let snapshot = self.snapshot.clone();
let table_metadata = self.table_metadata.clone();
let file_io = self.file_io.clone();
+ let case_sensitive = self.case_sensitive;
+ let filter = self.filter.clone();
Ok(try_stream! {
let manifest_list = snapshot
@@ -158,8 +196,24 @@ impl TableScan {
.await?;
// Generate data file stream
- let mut entries = iter(manifest_list.entries());
- while let Some(entry) = entries.next().await {
+ for entry in manifest_list.entries() {
+ // If this scan has a filter, check the partition evaluator
cache for an existing
+ // PartitionEvaluator that matches this manifest's partition
spec ID.
+ // Use one from the cache if there is one. If not, create one,
put it in
+ // the cache, and take a reference to it.
+ #[allow(clippy::map_entry)]
+ if let Some(filter) = filter.as_ref() {
+ if
!manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
+
manifest_evaluator_cache.insert(entry.partition_spec_id,
Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(),
table_metadata.clone(), case_sensitive, filter)?);
+ }
+ let manifest_evaluator =
&manifest_evaluator_cache[&entry.partition_spec_id];
+
+ // reject any manifest files whose partition values don't
match the filter.
+ if !manifest_evaluator.eval(entry)? {
+ continue;
+ }
+ }
+
let manifest = entry.load_manifest(&file_io).await?;
let mut manifest_entries =
iter(manifest.entries().iter().filter(|e| e.is_alive()));
@@ -186,6 +240,28 @@ impl TableScan {
.boxed())
}
+ fn create_manifest_evaluator(
+ id: i32,
+ schema: SchemaRef,
+ table_metadata: Arc<TableMetadata>,
+ case_sensitive: bool,
+ filter: &Predicate,
+ ) -> crate::Result<ManifestEvaluator> {
+ let bound_predicate = filter.bind(schema.clone(), case_sensitive)?;
+
+ let partition_spec =
table_metadata.partition_spec_by_id(id).ok_or(Error::new(
+ ErrorKind::Unexpected,
+ format!("Could not find partition spec for id {id}"),
+ ))?;
+
+ ManifestEvaluator::new(
+ partition_spec.clone(),
+ schema.clone(),
+ bound_predicate,
+ case_sensitive,
+ )
+ }
+
pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder =
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());