BlakeOrth commented on code in PR #18146:
URL: https://github.com/apache/datafusion/pull/18146#discussion_r2474711423


##########
datafusion/catalog-listing/src/helpers.rs:
##########
@@ -424,80 +380,41 @@ pub async fn pruned_partition_list<'a>(
     file_extension: &'a str,
     partition_cols: &'a [(String, DataType)],
 ) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
-    // if no partition col => simply list all the files
+    let objects = table_path
+        .list_all_files(ctx, store, file_extension)
+        .await?
+        .try_filter(|object_meta| futures::future::ready(object_meta.size > 
0));
+
     if partition_cols.is_empty() {
         if !filters.is_empty() {
             return internal_err!(
                 "Got partition filters for unpartitioned table {}",
                 table_path
             );
         }
-        return Ok(Box::pin(
-            table_path
-                .list_all_files(ctx, store, file_extension)
-                .await?
-                .try_filter(|object_meta| 
futures::future::ready(object_meta.size > 0))
-                .map_ok(|object_meta| object_meta.into()),
-        ));
-    }
-
-    let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
-
-    let partitions =
-        list_partitions(store, table_path, partition_cols.len(), 
partition_prefix)
-            .await?;
-    debug!("Listed {} partitions", partitions.len());
 
-    let pruned =
-        prune_partitions(table_path, partitions, filters, 
partition_cols).await?;
-
-    debug!("Pruning yielded {} partitions", pruned.len());
-
-    let stream = futures::stream::iter(pruned)
-        .map(move |partition: Partition| async move {
-            let cols = partition_cols.iter().map(|x| x.0.as_str());
-            let parsed = parse_partitions_for_path(table_path, 
&partition.path, cols);
+        // if no partition col => simply list all the files
+        Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
+    } else {
+        let df_schema = DFSchema::from_unqualified_fields(
+            partition_cols
+                .iter()
+                .map(|(n, d)| Field::new(n, d.clone(), true))
+                .collect(),
+            Default::default(),
+        )?;
 
-            let partition_values = parsed
-                .into_iter()
-                .flatten()
-                .zip(partition_cols)
-                .map(|(parsed, (_, datatype))| {
-                    ScalarValue::try_from_string(parsed.to_string(), datatype)
-                })
-                .collect::<Result<Vec<_>>>()?;
-
-            let files = match partition.files {
-                Some(files) => files,
-                None => {
-                    trace!("Recursively listing partition {}", partition.path);
-                    store.list(Some(&partition.path)).try_collect().await?
-                }
-            };
-            let files = files.into_iter().filter(move |o| {
-                let extension_match = 
o.location.as_ref().ends_with(file_extension);
-                // here need to scan 
subdirectories(`listing_table_ignore_subdirectory` = false)
-                let glob_match = table_path.contains(&o.location, false);
-                extension_match && glob_match
-            });
-
-            let stream = futures::stream::iter(files.map(move |object_meta| {
-                Ok(PartitionedFile {
-                    object_meta,
-                    partition_values: partition_values.clone(),
-                    range: None,
-                    statistics: None,
-                    extensions: None,
-                    metadata_size_hint: None,
-                })
-            }));
-
-            Ok::<_, DataFusionError>(stream)
-        })
-        .buffer_unordered(CONCURRENCY_LIMIT)

Review Comment:
   @alamb I just posted some benchmarks and results in
    - https://github.com/apache/datafusion/pull/18361
    
   Which I think can help inform potential performance trade-offs related to 
this PR. I saw your mention on the test harness PR and I will start looking 
into adding cases related to this work next.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to