sdd commented on code in PR #373:
URL: https://github.com/apache/iceberg-rust/pull/373#discussion_r1682309951
##########
crates/iceberg/src/scan.rs:
##########
@@ -389,12 +333,158 @@ impl FileScanStreamContext {
file_io,
bound_filter,
case_sensitive,
+ sender,
+ manifest_evaluator_cache: ManifestEvaluatorCache::new(),
+ partition_filter_cache: PartitionFilterCache::new(),
+ expression_evaluator_cache:
Arc::new(Mutex::new(ExpressionEvaluatorCache::new())),
})
}
- /// Returns a reference to the [`BoundPredicate`] filter.
- fn bound_filter(&self) -> Option<&BoundPredicate> {
- self.bound_filter.as_ref()
+ async fn run(&mut self, manifest_list: ManifestList) -> Result<()> {
+ let file_io = self.file_io.clone();
+ let sender = self.sender.clone();
+
+ // This whole Vec-and-for-loop approach feels sub-optimally structured.
+ // I've tried structuring this in multiple ways but run into
+ // issues with ownership. Ideally I'd like to structure this
+ // with a functional programming approach: extracting
+ // sections 1, 2, and 3 out into different methods on Self,
+ // and then use some iterator combinators to orchestrate it all.
+ // Section 1 is pretty trivially refactorable into a static method
+ // that can be used in a closure that can be used with
Iterator::filter.
+ // Similarly, section 3 seems easily factor-able into a method that can
+ // be passed into Iterator::map.
+ // Section 2 turns out trickier - we want to exit the entire `run`
method early
+ // if the eval fails, and filter out any manifest_files from the
iterator / stream
+ // if the eval succeeds but returns true. We bump into ownership
issues due
+ // to needing to pass mut self as the caches need to be able to mutate.
+ // Section 3 runs into ownership issues when trying to refactor its
closure to be
+ // a static or non-static method.
+
+ // 1
+ let filtered_manifest_files = manifest_list
+ .entries()
+ .iter()
+ .filter(Self::reject_unsupported_content_types);
+
+ // 2
+ let mut filtered_manifest_files2 = vec![];
+ for manifest_file in filtered_manifest_files {
+ if !self.apply_evaluator(manifest_file)? {
+ continue;
+ }
+
+ filtered_manifest_files2.push(manifest_file);
+ }
+
+ // 3
+ let filtered_manifest_files =
filtered_manifest_files2.into_iter().map(|manifest_file| {
+ Ok(ManifestFileProcessingContext {
+ manifest_file,
+ file_io: file_io.clone(),
+ sender: sender.clone(),
+ filter: &self.bound_filter,
+ expression_evaluator_cache:
self.expression_evaluator_cache.clone(),
+ })
+ });
+
+ futures::stream::iter(filtered_manifest_files)
+ .try_for_each_concurrent(
Review Comment:
Hey @liurenjie1024 - I think I'm happy with this now. Let me know what you
think.
There is one issue remaining. It seems that our `crate::runtime::spawn` does
not behave exactly the same as `tokio::spawn` for tasks that are passed an
async function that return a `Result`? I could not get this to work with
`crate::runtime::spawn` but it does with `tokio::spawn`. @odysa could you help?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]