liurenjie1024 commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1587013567


##########
crates/iceberg/src/scan.rs:
##########
@@ -169,55 +177,66 @@ pub struct TableScan {
     filter: Option<Arc<Predicate>>,
 }
 
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
 impl TableScan {
-    /// Returns a stream of file scan tasks.
-
-    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        // Cache `ManifestEvaluatorFactory`s created as part of this scan
-        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = 
HashMap::new();
-
-        // these variables needed to ensure that we don't need to pass a
-        // reference to self into `try_stream`, as it expects references
-        // passed in to outlive 'static
-        let schema = self.schema.clone();
-        let snapshot = self.snapshot.clone();
-        let table_metadata = self.table_metadata.clone();
-        let file_io = self.file_io.clone();
-        let case_sensitive = self.case_sensitive;
-        let filter = self.filter.clone();
+    /// Returns a stream of [`FileScanTask`]s.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let context = FileScanStreamContext::new(
+            self.schema.clone(),
+            self.snapshot.clone(),
+            self.table_metadata.clone(),
+            self.file_io.clone(),
+            self.filter.clone(),
+            self.case_sensitive,
+        );
+
+        let bound_filter = context.bound_filter()?;
+
+        let mut partition_filter_cache = PartitionFilterCache::new();
+        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
 
         Ok(try_stream! {
-            let manifest_list = snapshot
-            .clone()
-            .load_manifest_list(&file_io, &table_metadata)
-            .await?;
+            let manifest_list = context
+                .snapshot
+                .load_manifest_list(&context.file_io, &context.table_metadata)
+                .await?;
 
-            // Generate data file stream
             for entry in manifest_list.entries() {
-                // If this scan has a filter, check the partition evaluator 
cache for an existing
-                // PartitionEvaluator that matches this manifest's partition 
spec ID.
-                // Use one from the cache if there is one. If not, create one, 
put it in
-                // the cache, and take a reference to it.
-                #[allow(clippy::map_entry)]
-                if let Some(filter) = filter.as_ref() {
-                    if 
!manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-                        
manifest_evaluator_cache.insert(entry.partition_spec_id, 
Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), 
table_metadata.clone(), case_sensitive, filter)?);
-                    }
-                    let manifest_evaluator = 
&manifest_evaluator_cache[&entry.partition_spec_id];
+                if let Some(filter) = &bound_filter {
+                    let partition_spec_id = entry.partition_spec_id;
+
+                    let (partition_spec, partition_schema) =
+                        
context.create_partition_spec_and_schema(partition_spec_id)?;
+
+                    let partition_filter = partition_filter_cache.get(
+                        partition_spec_id,
+                        partition_spec.clone(),
+                        partition_schema.clone(),
+                        filter,
+                        context.case_sensitive,
+                    )?;
+
+                    let manifest_evaluator = manifest_evaluator_cache.get(
+                        partition_spec_id,
+                        partition_schema.schema_id(),
+                        partition_filter.clone(),
+                        context.case_sensitive,
+                    );
 
-                    // reject any manifest files whose partition values don't 
match the filter.
                     if !manifest_evaluator.eval(entry)? {
                         continue;
                     }
+
+                    // TODO: Create ExpressionEvaluator
                 }
 
-                let manifest = entry.load_manifest(&file_io).await?;
+                let manifest = entry.load_manifest(&context.file_io).await?;

Review Comment:
   Yes, we need to resolve #124 first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to