deniskuzZ commented on code in PR #5957: URL: https://github.com/apache/hive/pull/5957#discussion_r2213799941
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java: ########## @@ -165,4 +189,76 @@ private void addCompactionTargetIfEligible(Table table, org.apache.iceberg.Table ci.type = compactionEvaluator.determineCompactionType(); compactions.add(ci); } + + /** + * Finds all unique non-compaction-modified partitions (with added or deleted files) between a given past + * snapshot ID and the table's current (latest) snapshot. + * @param hiveTable The {@link org.apache.hadoop.hive.ql.metadata.Table} instance to inspect. + * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot to check from (exclusive). + * @param latestSpecOnly when True, returns partitions with the current spec only; + * False - older specs only; + * Null - any spec + * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition} representing the unique modified + * partition names. + * @throws IllegalArgumentException if snapshot IDs are invalid or out of order, or if the table has no current + * snapshot. + */ + private List<Partition> findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable, + org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, Boolean latestSpecOnly) { + + List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable, pastSnapshotTimeMil).toList(); + if (relevantSnapshots.isEmpty()) { + return Collections.emptyList(); + } + + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + // Submit a task for each snapshot and collect the Futures + List<Future<Set<String>>> futures = relevantSnapshots.stream() + .map(snapshot -> executor.submit(() -> { + FileIO io = icebergTable.io(); + List<ContentFile<?>> affectedFiles = FluentIterable.<ContentFile<?>>concat( + snapshot.addedDataFiles(io), + snapshot.removedDataFiles(io), + snapshot.addedDeleteFiles(io), + snapshot.removedDeleteFiles(io)) + .toList(); + return IcebergTableUtil.getPartitionNames(icebergTable, affectedFiles, latestSpecOnly); + })) + .toList(); + + // Collect the results from all completed futures + Set<String> modifiedPartitions = Sets.newHashSet(); + for (Future<Set<String>> future : futures) { + modifiedPartitions.addAll(future.get()); + } + + return IcebergTableUtil.convertNameToMetastorePartition(hiveTable, modifiedPartitions); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeMetaException(e, "Failed to find modified partitions in parallel"); + } + } + + /** + * Checks if a table has had new commits since a given snapshot that were not caused by compaction. + * @param icebergTable The Iceberg table to check. + * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot to check from (exclusive). + * @return true if at least one non-compaction snapshot exists since the pastSnapshotTimeMil + * whose source is not compaction, false otherwise. + */ + private boolean hasNewCommits(org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil) { + return getRelevantSnapshots(icebergTable, pastSnapshotTimeMil) + .findAny().isPresent(); + } + + private Stream<Snapshot> getRelevantSnapshots(org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil) { + Snapshot currentSnapshot = icebergTable.currentSnapshot(); + if (currentSnapshot == null || Objects.equals(currentSnapshot.timestampMillis(), pastSnapshotTimeMil)) { + return Stream.empty(); + } + + return StreamSupport.stream(icebergTable.snapshots().spliterator(), false) Review Comment: why do you need `StreamSupport.stream(icebergTable.snapshots().spliterator()` ? icebergTable.snapshots().stream() is not good? ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java: ########## @@ -165,4 +189,76 @@ private void addCompactionTargetIfEligible(Table table, org.apache.iceberg.Table ci.type = compactionEvaluator.determineCompactionType(); compactions.add(ci); } + + /** + * Finds all unique non-compaction-modified partitions (with added or deleted files) between a given past + * snapshot ID and the table's current (latest) snapshot. + * @param hiveTable The {@link org.apache.hadoop.hive.ql.metadata.Table} instance to inspect. + * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot to check from (exclusive). + * @param latestSpecOnly when True, returns partitions with the current spec only; + * False - older specs only; + * Null - any spec + * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition} representing the unique modified + * partition names. + * @throws IllegalArgumentException if snapshot IDs are invalid or out of order, or if the table has no current + * snapshot. + */ + private List<Partition> findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable, + org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, Boolean latestSpecOnly) { + + List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable, pastSnapshotTimeMil).toList(); + if (relevantSnapshots.isEmpty()) { + return Collections.emptyList(); + } + + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + // Submit a task for each snapshot and collect the Futures + List<Future<Set<String>>> futures = relevantSnapshots.stream() + .map(snapshot -> executor.submit(() -> { + FileIO io = icebergTable.io(); + List<ContentFile<?>> affectedFiles = FluentIterable.<ContentFile<?>>concat( + snapshot.addedDataFiles(io), + snapshot.removedDataFiles(io), + snapshot.addedDeleteFiles(io), + snapshot.removedDeleteFiles(io)) + .toList(); + return IcebergTableUtil.getPartitionNames(icebergTable, affectedFiles, latestSpecOnly); + })) + .toList(); + + // Collect the results from all completed futures + Set<String> modifiedPartitions = Sets.newHashSet(); + for (Future<Set<String>> future : futures) { + modifiedPartitions.addAll(future.get()); + } + + return IcebergTableUtil.convertNameToMetastorePartition(hiveTable, modifiedPartitions); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeMetaException(e, "Failed to find modified partitions in parallel"); + } + } + + /** + * Checks if a table has had new commits since a given snapshot that were not caused by compaction. + * @param icebergTable The Iceberg table to check. + * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot to check from (exclusive). + * @return true if at least one non-compaction snapshot exists since the pastSnapshotTimeMil + * whose source is not compaction, false otherwise. + */ + private boolean hasNewCommits(org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil) { + return getRelevantSnapshots(icebergTable, pastSnapshotTimeMil) + .findAny().isPresent(); + } + + private Stream<Snapshot> getRelevantSnapshots(org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil) { + Snapshot currentSnapshot = icebergTable.currentSnapshot(); + if (currentSnapshot == null || Objects.equals(currentSnapshot.timestampMillis(), pastSnapshotTimeMil)) { + return Stream.empty(); + } + + return StreamSupport.stream(icebergTable.snapshots().spliterator(), false) Review Comment: why do you need `StreamSupport.stream(icebergTable.snapshots().spliterator()` ? `icebergTable.snapshots().stream()` is not good? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org