sreejasahithi commented on code in PR #10145:
URL: https://github.com/apache/ozone/pull/10145#discussion_r3159905459
##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -251,4 +271,109 @@ private Set<Pair<String, String>>
rewriteVersionFile(TableMetadata metadata, Str
return result;
}
+
+ private Set<String> manifestsToRewrite(Set<Snapshot> deltaSnapshots,
TableMetadata startMetadata) {
+ Table endStaticTable =
RewriteTablePathOzoneUtils.newStaticTable(endVersionName, table.io());
+
+ final Set<Long> deltaSnapshotIds;
+ if (startMetadata != null) {
+ deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ } else {
+ deltaSnapshotIds = null;
+ }
+
+ Set<String> manifestPaths = ConcurrentHashMap.newKeySet();
+ int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+ Semaphore semaphore = new Semaphore(maxInFlight);
+
+ ExecutorCompletionService<Void> completionService = new
ExecutorCompletionService<>(executorService);
+
+ int submittedTasks = 0;
+ int completedTasks = 0;
+
+ try {
+ for (Snapshot snapshot : endStaticTable.snapshots()) {
+ semaphore.acquire(); // blocks when maxInFlight tasks are already
in-flight
+
+ final long snapshotId = snapshot.snapshotId();
+ final String manifestListLocation = snapshot.manifestListLocation();
+
+ boolean taskSubmitted = false;
+ try {
+ completionService.submit(() -> {
+ try (CloseableIterable<ManifestFile> manifests =
+ InternalData.read(
+ FileFormat.AVRO,
+ table.io().newInputFile(manifestListLocation))
+ .setRootType(GenericManifestFile.class)
+ .setCustomType(
+ ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID,
+ GenericPartitionFieldSummary.class)
+ .project(ManifestFile.schema())
+ .build()) {
+
+ for (ManifestFile manifest : manifests) {
+ if (deltaSnapshotIds == null) {
+ manifestPaths.add(manifest.path());
+ } else if (manifest.snapshotId() != null
Review Comment:
For any manifest list file that was correctly written, snapshotId() will
never be null.
The check added here is a defensive check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]