dramaticlly commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147038788


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row 
convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, 
partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = 
Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize 
partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, 
normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, 
normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained 
via scan
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = 
transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    // hardcoded to avoid scan stats column on partition table
+                    .select(BaseScan.SCAN_COLUMNS));
+
+    return (scan.planExecutor() != null)
+        ? new ParallelIterable<>(tasks, scan.planExecutor())
+        : CloseableIterable.concat(tasks);

Review Comment:
   Looks like upstream always set this in 
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/TableScanContext.java#L333
 so it's not null. Updated to use ParallelIterable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to