ashishkumar50 commented on code in PR #10145:
URL: https://github.com/apache/ozone/pull/10145#discussion_r3158697855
##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -251,4 +271,109 @@ private Set<Pair<String, String>>
rewriteVersionFile(TableMetadata metadata, Str
return result;
}
+
+ private Set<String> manifestsToRewrite(Set<Snapshot> deltaSnapshots,
TableMetadata startMetadata) {
+ Table endStaticTable =
RewriteTablePathOzoneUtils.newStaticTable(endVersionName, table.io());
+
+ final Set<Long> deltaSnapshotIds;
+ if (startMetadata != null) {
Review Comment:
The entire `startMetadata` object is passed but only .equals(null) is
checked on it. The actual snapshot data is already captured in deltaSnapshots.
##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -251,4 +271,109 @@ private Set<Pair<String, String>>
rewriteVersionFile(TableMetadata metadata, Str
return result;
}
+
+ private Set<String> manifestsToRewrite(Set<Snapshot> deltaSnapshots,
TableMetadata startMetadata) {
+ Table endStaticTable =
RewriteTablePathOzoneUtils.newStaticTable(endVersionName, table.io());
+
+ final Set<Long> deltaSnapshotIds;
+ if (startMetadata != null) {
+ deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ } else {
+ deltaSnapshotIds = null;
+ }
+
+ Set<String> manifestPaths = ConcurrentHashMap.newKeySet();
+ int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+ Semaphore semaphore = new Semaphore(maxInFlight);
+
+ ExecutorCompletionService<Void> completionService = new
ExecutorCompletionService<>(executorService);
+
+ int submittedTasks = 0;
+ int completedTasks = 0;
+
+ try {
+ for (Snapshot snapshot : endStaticTable.snapshots()) {
+ semaphore.acquire(); // blocks when maxInFlight tasks are already
in-flight
+
+ final long snapshotId = snapshot.snapshotId();
+ final String manifestListLocation = snapshot.manifestListLocation();
+
+ boolean taskSubmitted = false;
+ try {
+ completionService.submit(() -> {
+ try (CloseableIterable<ManifestFile> manifests =
+ InternalData.read(
+ FileFormat.AVRO,
+ table.io().newInputFile(manifestListLocation))
+ .setRootType(GenericManifestFile.class)
+ .setCustomType(
+ ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID,
+ GenericPartitionFieldSummary.class)
+ .project(ManifestFile.schema())
+ .build()) {
+
+ for (ManifestFile manifest : manifests) {
+ if (deltaSnapshotIds == null) {
+ manifestPaths.add(manifest.path());
+ } else if (manifest.snapshotId() != null
Review Comment:
Legacy or old Iceberg manifests can have snapshotId() == null ?
##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -205,6 +223,8 @@ private String rebuildMetadata() {
}
RewriteResult<Snapshot> rewriteVersionResult =
rewriteVersionFiles(endMetadata);
+ Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata,
rewriteVersionResult.toRewrite());
+ Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots,
startMetadata);
Review Comment:
Add a TODO here what `manifestsToRewrite` will be used for
##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -57,11 +69,15 @@ public class RewriteTablePathOzoneAction implements
RewriteTablePath {
private String stagingDir;
private int parallelism;
+ private ExecutorService executorService;
+ private static final int MAX_INFLIGHT_MULTIPLIER = 4;
+ private static final int DEFAULT_THREAD_COUNT = 10;
+
private final Table table;
public RewriteTablePathOzoneAction(Table table) {
this.table = table;
- this.parallelism = Runtime.getRuntime().availableProcessors();
+ this.parallelism = DEFAULT_THREAD_COUNT;
Review Comment:
Thread count can be passed via command, can be done in subsequent PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]