BlakeOrth commented on code in PR #18146:
URL: https://github.com/apache/datafusion/pull/18146#discussion_r2453457296


##########
datafusion/catalog-listing/src/helpers.rs:
##########
@@ -424,80 +380,41 @@ pub async fn pruned_partition_list<'a>(
     file_extension: &'a str,
     partition_cols: &'a [(String, DataType)],
 ) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
-    // if no partition col => simply list all the files
+    let objects = table_path
+        .list_all_files(ctx, store, file_extension)
+        .await?
+        .try_filter(|object_meta| futures::future::ready(object_meta.size > 
0));
+
     if partition_cols.is_empty() {
         if !filters.is_empty() {
             return internal_err!(
                 "Got partition filters for unpartitioned table {}",
                 table_path
             );
         }
-        return Ok(Box::pin(
-            table_path
-                .list_all_files(ctx, store, file_extension)
-                .await?
-                .try_filter(|object_meta| 
futures::future::ready(object_meta.size > 0))
-                .map_ok(|object_meta| object_meta.into()),
-        ));
-    }
-
-    let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
-
-    let partitions =
-        list_partitions(store, table_path, partition_cols.len(), 
partition_prefix)
-            .await?;
-    debug!("Listed {} partitions", partitions.len());
 
-    let pruned =
-        prune_partitions(table_path, partitions, filters, 
partition_cols).await?;
-
-    debug!("Pruning yielded {} partitions", pruned.len());
-
-    let stream = futures::stream::iter(pruned)
-        .map(move |partition: Partition| async move {
-            let cols = partition_cols.iter().map(|x| x.0.as_str());
-            let parsed = parse_partitions_for_path(table_path, 
&partition.path, cols);
+        // if no partition col => simply list all the files
+        Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
+    } else {
+        let df_schema = DFSchema::from_unqualified_fields(
+            partition_cols
+                .iter()
+                .map(|(n, d)| Field::new(n, d.clone(), true))
+                .collect(),
+            Default::default(),
+        )?;
 
-            let partition_values = parsed
-                .into_iter()
-                .flatten()
-                .zip(partition_cols)
-                .map(|(parsed, (_, datatype))| {
-                    ScalarValue::try_from_string(parsed.to_string(), datatype)
-                })
-                .collect::<Result<Vec<_>>>()?;
-
-            let files = match partition.files {
-                Some(files) => files,
-                None => {
-                    trace!("Recursively listing partition {}", partition.path);
-                    store.list(Some(&partition.path)).try_collect().await?
-                }
-            };
-            let files = files.into_iter().filter(move |o| {
-                let extension_match = 
o.location.as_ref().ends_with(file_extension);
-                // here need to scan 
subdirectories(`listing_table_ignore_subdirectory` = false)
-                let glob_match = table_path.contains(&o.location, false);
-                extension_match && glob_match
-            });
-
-            let stream = futures::stream::iter(files.map(move |object_meta| {
-                Ok(PartitionedFile {
-                    object_meta,
-                    partition_values: partition_values.clone(),
-                    range: None,
-                    statistics: None,
-                    extensions: None,
-                    metadata_size_hint: None,
-                })
-            }));
-
-            Ok::<_, DataFusionError>(stream)
-        })
-        .buffer_unordered(CONCURRENCY_LIMIT)

Review Comment:
   Yes, although it's subtly more complex than that. This existing 
implementation doesn't blindly list all known partitions in parallel. It's 
probably more fair to say that it potentially filters known partitions and then 
rediscovers all un-pruned partitions in parallel. I think an example probably 
shows this off pretty well.
   
   Given a table with the following structure:
   ```console
   test_table/
   ├── a=1
   │   └── b=10
   │       └── c=100
   │           └── file1.parquet
   ├── a=2
   │   └── b=20
   │       └── c=200
   │           └── file2.parquet
   └── a=3
       └── b=30
           └── c=300
               └── file2.parquet
   ```
   
   Here are some annotated query examples showing the list operations:
   ```sql
   > create external table test_table
   stored as parquet location '/tmp/test_table/';
   
   > select count(*) from test_table;
   +----------+
   | count(*) |
   +----------+
   | 6        |
   +----------+
   1 row(s) fetched.
   Elapsed 0.007 seconds.
   
   Object Store Profiling
   Instrumented Object Store: instrument_mode: Trace, inner: 
LocalFileSystem(file:///)
   # This list call is executed, rediscovering partition 'a'
   2025-10-22T21:40:58.237088151+00:00 operation=List duration=0.000294s 
path=tmp/test_table
   # ----
   # These 3 list calls are executed in parallel, rediscovering all of the 
partitions at the 2nd level, in this case 'b'
   2025-10-22T21:40:58.237397741+00:00 operation=List duration=0.000081s 
path=tmp/test_table/a=1
   2025-10-22T21:40:58.237414558+00:00 operation=List duration=0.000069s 
path=tmp/test_table/a=2
   2025-10-22T21:40:58.237436985+00:00 operation=List duration=0.000101s 
path=tmp/test_table/a=3
   # ---
   # Then the 'b' partitions are listed in parallel, rediscovering the 'c' 
partitions
   2025-10-22T21:40:58.237487175+00:00 operation=List duration=0.000056s 
path=tmp/test_table/a=1/b=10
   2025-10-22T21:40:58.237513848+00:00 operation=List duration=0.000058s 
path=tmp/test_table/a=2/b=20
   # Then the 'c' partitions are listed in parallel, finally discovering 
readable files for this table.
   # Note that, while the 'c' partition directly following this comment is 
returned prior to a 'b' partition, the timestamps 
   # indicate the list call for the 'b' partition was submitted first.
   2025-10-22T21:40:58.237576223+00:00 operation=List duration=0.000047s 
path=tmp/test_table/a=2/b=20/c=200
   2025-10-22T21:40:58.237548133+00:00 operation=List duration=0.000080s 
path=tmp/test_table/a=3/b=30
   2025-10-22T21:40:58.237560094+00:00 operation=List duration=0.000088s 
path=tmp/test_table/a=1/b=10/c=100
   2025-10-22T21:40:58.237631945+00:00 operation=List duration=0.000095s 
path=tmp/test_table/a=3/b=30/c=300
   # Only after the partitions are listed and files discovered does any reading 
of the actual data start
   2025-10-22T21:40:58.238183601+00:00 operation=Get duration=0.000085s size=8 
range: bytes=477-484 path=tmp/test_table/a=2/b=20/c=200/file2.parquet
   2025-10-22T21:40:58.238237666+00:00 operation=Get duration=0.000041s size=8 
range: bytes=477-484 path=tmp/test_table/a=3/b=30/c=300/file2.parquet
   . . .
   ```
   
   Next, using a simple filter on a single partition column value. Note the 
list calls are identical even though there's a filter in the query:
   ```sql
   > select count(*) from test_table where b=30;
   +----------+
   | count(*) |
   +----------+
   | 2        |
   +----------+
   1 row(s) fetched.
   Elapsed 0.018 seconds.
   
   Object Store Profiling
   Instrumented Object Store: instrument_mode: Trace, inner: 
LocalFileSystem(file:///)
   2025-10-22T21:53:51.490170215+00:00 operation=List duration=0.000815s 
path=tmp/test_table
   2025-10-22T21:53:51.491027338+00:00 operation=List duration=0.000355s 
path=tmp/test_table/a=1
   2025-10-22T21:53:51.491104157+00:00 operation=List duration=0.000355s 
path=tmp/test_table/a=2
   2025-10-22T21:53:51.491261191+00:00 operation=List duration=0.000211s 
path=tmp/test_table/a=3
   2025-10-22T21:53:51.491484012+00:00 operation=List duration=0.000184s 
path=tmp/test_table/a=2/b=20
   2025-10-22T21:53:51.491405857+00:00 operation=List duration=0.000360s 
path=tmp/test_table/a=1/b=10
   2025-10-22T21:53:51.491523030+00:00 operation=List duration=0.000259s 
path=tmp/test_table/a=3/b=30
   2025-10-22T21:53:51.491698312+00:00 operation=List duration=0.000181s 
path=tmp/test_table/a=2/b=20/c=200
   2025-10-22T21:53:51.491793944+00:00 operation=List duration=0.000363s 
path=tmp/test_table/a=1/b=10/c=100
   2025-10-22T21:53:51.491833864+00:00 operation=List duration=0.000350s 
path=tmp/test_table/a=3/b=30/c=300
   ```
   
   This query shows where I believe there is a potential performance 
_regression_ with this PR exactly as written. This shows the existing code 
pruning list operations when the filter can be evaluated against the known 
partition columns.
   ```sql
   > select count(*) from test_table where a=3 and b=30;
   +----------+
   | count(*) |
   +----------+
   | 2        |
   +----------+
   1 row(s) fetched.
   Elapsed 0.012 seconds.
   
   Object Store Profiling
   Instrumented Object Store: instrument_mode: Trace, inner: 
LocalFileSystem(file:///)
   2025-10-22T21:57:58.229995504+00:00 operation=List duration=0.000373s 
path=tmp/test_table/a=3/b=30
   2025-10-22T21:57:58.230384839+00:00 operation=List duration=0.000146s 
path=tmp/test_table/a=3/b=30/c=300
   ```
   However, the above optimization _only_ applies when the full column path 
from the beginning of the table structure is present, as the following query 
goes back to listing every directory in the table.
   ```sql
   > select count(*) from test_table where b=20 and c=200;
   +----------+
   | count(*) |
   +----------+
   | 2        |
   +----------+
   1 row(s) fetched.
   Elapsed 0.013 seconds.
   
   Object Store Profiling
   Instrumented Object Store: instrument_mode: Trace, inner: 
LocalFileSystem(file:///)
   2025-10-22T22:01:02.257613181+00:00 operation=List duration=0.000386s 
path=tmp/test_table
   2025-10-22T22:01:02.258016229+00:00 operation=List duration=0.000196s 
path=tmp/test_table/a=1
   2025-10-22T22:01:02.258038911+00:00 operation=List duration=0.000181s 
path=tmp/test_table/a=2
   2025-10-22T22:01:02.258176544+00:00 operation=List duration=0.000046s 
path=tmp/test_table/a=3
   2025-10-22T22:01:02.258240850+00:00 operation=List duration=0.000043s 
path=tmp/test_table/a=2/b=20
   2025-10-22T22:01:02.258250880+00:00 operation=List duration=0.000052s 
path=tmp/test_table/a=3/b=30
   2025-10-22T22:01:02.258226660+00:00 operation=List duration=0.000080s 
path=tmp/test_table/a=1/b=10
   2025-10-22T22:01:02.258288622+00:00 operation=List duration=0.000045s 
path=tmp/test_table/a=2/b=20/c=200
   2025-10-22T22:01:02.258310145+00:00 operation=List duration=0.000035s 
path=tmp/test_table/a=3/b=30/c=300
   2025-10-22T22:01:02.258321461+00:00 operation=List duration=0.000039s 
path=tmp/test_table/a=1/b=10/c=100
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to