difin commented on code in PR #5957:
URL: https://github.com/apache/hive/pull/5957#discussion_r2214412566


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java:
##########
@@ -165,4 +189,76 @@ private void addCompactionTargetIfEligible(Table table, 
org.apache.iceberg.Table
     ci.type = compactionEvaluator.determineCompactionType();
     compactions.add(ci);
   }
+
+  /**
+   * Finds all unique non-compaction-modified partitions (with added or 
deleted files) between a given past
+   * snapshot ID and the table's current (latest) snapshot.
+   * @param hiveTable The {@link org.apache.hadoop.hive.ql.metadata.Table} 
instance to inspect.
+   * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot 
to check from (exclusive).
+   * @param latestSpecOnly when True, returns partitions with the current spec 
only;
+   *                       False - older specs only;
+   *                       Null - any spec
+   * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition} 
representing the unique modified
+   *                       partition names.
+   * @throws IllegalArgumentException if snapshot IDs are invalid or out of 
order, or if the table has no current
+   *                       snapshot.
+   */
+  private List<Partition> 
findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable,
+      org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, Boolean 
latestSpecOnly) {
+
+    List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable, 
pastSnapshotTimeMil).toList();
+    if (relevantSnapshots.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    try (ExecutorService executor = 
Executors.newVirtualThreadPerTaskExecutor()) {
+      // Submit a task for each snapshot and collect the Futures
+      List<Future<Set<String>>> futures = relevantSnapshots.stream()
+          .map(snapshot -> executor.submit(() -> {
+            FileIO io = icebergTable.io();
+            List<ContentFile<?>> affectedFiles = 
FluentIterable.<ContentFile<?>>concat(
+                    snapshot.addedDataFiles(io),
+                    snapshot.removedDataFiles(io),
+                    snapshot.addedDeleteFiles(io),
+                    snapshot.removedDeleteFiles(io))
+                .toList();
+            return IcebergTableUtil.getPartitionNames(icebergTable, 
affectedFiles, latestSpecOnly);
+          }))
+          .toList();
+
+      // Collect the results from all completed futures
+      Set<String> modifiedPartitions = Sets.newHashSet();
+      for (Future<Set<String>> future : futures) {
+        modifiedPartitions.addAll(future.get());
+      }
+
+      return IcebergTableUtil.convertNameToMetastorePartition(hiveTable, 
modifiedPartitions);
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeMetaException(e, "Failed to find modified partitions in 
parallel");
+    }
+  }
+
+  /**
+   * Checks if a table has had new commits since a given snapshot that were 
not caused by compaction.
+   * @param icebergTable The Iceberg table to check.
+   * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot 
to check from (exclusive).
+   * @return true if at least one non-compaction snapshot exists since the 
pastSnapshotTimeMil
+   * whose source is not compaction, false otherwise.
+   */
+  private boolean hasNewCommits(org.apache.iceberg.Table icebergTable, Long 
pastSnapshotTimeMil) {
+    return getRelevantSnapshots(icebergTable, pastSnapshotTimeMil)
+        .findAny().isPresent();
+  }
+
+  private Stream<Snapshot> getRelevantSnapshots(org.apache.iceberg.Table 
icebergTable, Long pastSnapshotTimeMil) {
+    Snapshot currentSnapshot = icebergTable.currentSnapshot();
+    if (currentSnapshot == null || 
Objects.equals(currentSnapshot.timestampMillis(), pastSnapshotTimeMil)) {
+      return Stream.empty();
+    }
+
+    return StreamSupport.stream(icebergTable.snapshots().spliterator(), false)

Review Comment:
   `icebergTable.snapshots()` returns `Iterable<Snapshot>` and it doesn't have` 
.stream()` method..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to