dramaticlly commented on code in PR #7190: URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147038788
########## core/src/main/java/org/apache/iceberg/PartitionsTable.java: ########## @@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) { partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount); } - private static Iterable<Partition> partitions(Table table, StaticTableScan scan) { - CloseableIterable<FileScanTask> tasks = planFiles(scan); + @VisibleForTesting + static Iterable<Partition> partitions(Table table, StaticTableScan scan) { Types.StructType normalizedPartitionType = Partitioning.partitionType(table); PartitionMap partitions = new PartitionMap(); // cache a position map needed by each partition spec to normalize partitions to final schema Map<Integer, int[]> normalizedPositionsBySpec = Maps.newHashMapWithExpectedSize(table.specs().size()); + // logic to handle the partition evolution + int[] normalizedPositions = + normalizedPositionsBySpec.computeIfAbsent( + table.spec().specId(), + specId -> normalizedPositions(table, specId, normalizedPartitionType)); + + // parallelize the manifest read and + CloseableIterable<DataFile> datafiles = planDataFiles(scan); - for (FileScanTask task : tasks) { - PartitionData original = (PartitionData) task.file().partition(); - int[] normalizedPositions = - normalizedPositionsBySpec.computeIfAbsent( - task.spec().specId(), - specId -> normalizedPositions(table, specId, normalizedPartitionType)); + for (DataFile dataFile : datafiles) { + PartitionData original = (PartitionData) dataFile.partition(); PartitionData normalized = normalizePartition(original, normalizedPartitionType, normalizedPositions); - partitions.get(normalized).update(task.file()); + partitions.get(normalized).update(dataFile); } + return partitions.all(); } + @VisibleForTesting + static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) { + Table table = scan.table(); + Snapshot snapshot = scan.snapshot(); + + // read list of data and delete manifests from current snapshot obtained via scan + CloseableIterable<ManifestFile> dataManifests = + CloseableIterable.withNoopClose(snapshot.dataManifests(table.io())); + + LoadingCache<Integer, ManifestEvaluator> evalCache = + Caffeine.newBuilder() + .build( + specId -> { + PartitionSpec spec = table.specs().get(specId); + PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec); + return ManifestEvaluator.forRowFilter( + scan.filter(), transformedSpec, scan.isCaseSensitive()); + }); + + CloseableIterable<ManifestFile> filteredManifests = + CloseableIterable.filter( + dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); + + Iterable<CloseableIterable<DataFile>> tasks = + CloseableIterable.transform( + filteredManifests, + manifest -> + ManifestFiles.read(manifest, table.io(), table.specs()) + .caseSensitive(scan.isCaseSensitive()) + // hardcoded to avoid scan stats column on partition table + .select(BaseScan.SCAN_COLUMNS)); + + return (scan.planExecutor() != null) + ? new ParallelIterable<>(tasks, scan.planExecutor()) + : CloseableIterable.concat(tasks); Review Comment: Looks like upstream always set this in https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/TableScanContext.java#L333 so it's not null. Updated to use ParallelIterable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org