This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0154829  feat: implement manifest filtering in TableScan (#323)
0154829 is described below

commit 0154829566d12236fd69fc313f2b51c2f8e82c4a
Author: Scott Donnelly <[email protected]>
AuthorDate: Fri Apr 26 06:36:27 2024 +0100

    feat: implement manifest filtering in TableScan (#323)
---
 crates/iceberg/src/scan.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 79 insertions(+), 3 deletions(-)

diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index b96c470..36f71c1 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -18,14 +18,20 @@
 //! Table scan api.
 
 use crate::arrow::ArrowReaderBuilder;
+use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
+use crate::expr::{Bind, Predicate};
 use crate::io::FileIO;
-use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, 
TableMetadataRef};
+use crate::spec::{
+    DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadata, 
TableMetadataRef,
+};
 use crate::table::Table;
 use crate::{Error, ErrorKind};
 use arrow_array::RecordBatch;
 use async_stream::try_stream;
 use futures::stream::{iter, BoxStream};
 use futures::StreamExt;
+use std::collections::HashMap;
+use std::sync::Arc;
 
 /// Builder to create table scan.
 pub struct TableScanBuilder<'a> {
@@ -34,6 +40,8 @@ pub struct TableScanBuilder<'a> {
     column_names: Vec<String>,
     snapshot_id: Option<i64>,
     batch_size: Option<usize>,
+    case_sensitive: bool,
+    filter: Option<Predicate>,
 }
 
 impl<'a> TableScanBuilder<'a> {
@@ -43,6 +51,8 @@ impl<'a> TableScanBuilder<'a> {
             column_names: vec![],
             snapshot_id: None,
             batch_size: None,
+            case_sensitive: true,
+            filter: None,
         }
     }
 
@@ -53,6 +63,20 @@ impl<'a> TableScanBuilder<'a> {
         self
     }
 
+    /// Sets the scan's case sensitivity
+    pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
+        self.case_sensitive = case_sensitive;
+        self
+    }
+
+    /// Specifies a predicate to use as a filter
+    pub fn with_filter(mut self, predicate: Predicate) -> Self {
+        // calls rewrite_not to remove Not nodes, which must be absent
+        // when applying the manifest evaluator
+        self.filter = Some(predicate.rewrite_not());
+        self
+    }
+
     /// Select all columns.
     pub fn select_all(mut self) -> Self {
         self.column_names.clear();
@@ -125,6 +149,8 @@ impl<'a> TableScanBuilder<'a> {
             column_names: self.column_names,
             schema,
             batch_size: self.batch_size,
+            case_sensitive: self.case_sensitive,
+            filter: self.filter.map(Arc::new),
         })
     }
 }
@@ -139,6 +165,8 @@ pub struct TableScan {
     column_names: Vec<String>,
     schema: SchemaRef,
     batch_size: Option<usize>,
+    case_sensitive: bool,
+    filter: Option<Arc<Predicate>>,
 }
 
 /// A stream of [`FileScanTask`].
@@ -146,10 +174,20 @@ pub type FileScanTaskStream = BoxStream<'static, 
crate::Result<FileScanTask>>;
 
 impl TableScan {
     /// Returns a stream of file scan tasks.
+
     pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
+        // Cache `ManifestEvaluatorFactory`s created as part of this scan
+        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = 
HashMap::new();
+
+        // these variables needed to ensure that we don't need to pass a
+        // reference to self into `try_stream`, as it expects references
+        // passed in to outlive 'static
+        let schema = self.schema.clone();
         let snapshot = self.snapshot.clone();
         let table_metadata = self.table_metadata.clone();
         let file_io = self.file_io.clone();
+        let case_sensitive = self.case_sensitive;
+        let filter = self.filter.clone();
 
         Ok(try_stream! {
             let manifest_list = snapshot
@@ -158,8 +196,24 @@ impl TableScan {
             .await?;
 
             // Generate data file stream
-            let mut entries = iter(manifest_list.entries());
-            while let Some(entry) = entries.next().await {
+            for entry in manifest_list.entries() {
+                // If this scan has a filter, check the partition evaluator 
cache for an existing
+                // PartitionEvaluator that matches this manifest's partition 
spec ID.
+                // Use one from the cache if there is one. If not, create one, 
put it in
+                // the cache, and take a reference to it.
+                #[allow(clippy::map_entry)]
+                if let Some(filter) = filter.as_ref() {
+                    if 
!manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
+                        
manifest_evaluator_cache.insert(entry.partition_spec_id, 
Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), 
table_metadata.clone(), case_sensitive, filter)?);
+                    }
+                    let manifest_evaluator = 
&manifest_evaluator_cache[&entry.partition_spec_id];
+
+                    // reject any manifest files whose partition values don't 
match the filter.
+                    if !manifest_evaluator.eval(entry)? {
+                        continue;
+                    }
+                }
+
                 let manifest = entry.load_manifest(&file_io).await?;
 
                 let mut manifest_entries = 
iter(manifest.entries().iter().filter(|e| e.is_alive()));
@@ -186,6 +240,28 @@ impl TableScan {
         .boxed())
     }
 
+    fn create_manifest_evaluator(
+        id: i32,
+        schema: SchemaRef,
+        table_metadata: Arc<TableMetadata>,
+        case_sensitive: bool,
+        filter: &Predicate,
+    ) -> crate::Result<ManifestEvaluator> {
+        let bound_predicate = filter.bind(schema.clone(), case_sensitive)?;
+
+        let partition_spec = 
table_metadata.partition_spec_by_id(id).ok_or(Error::new(
+            ErrorKind::Unexpected,
+            format!("Could not find partition spec for id {id}"),
+        ))?;
+
+        ManifestEvaluator::new(
+            partition_spec.clone(),
+            schema.clone(),
+            bound_predicate,
+            case_sensitive,
+        )
+    }
+
     pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
         let mut arrow_reader_builder =
             ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());

Reply via email to