sreejasahithi commented on code in PR #10306:
URL: https://github.com/apache/ozone/pull/10306#discussion_r3281240596


##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -675,4 +695,188 @@ private static RewriteResult<DeleteFile> 
writeDeleteManifest(
       throw new RuntimeIOException(e);
     }
   }
+
+  static class OzonePositionDeleteReaderWriter implements 
RewriteTablePathUtil.PositionDeleteReaderWriter {
+    @Override
+    public CloseableIterable<Record> reader(
+        InputFile inputFile, FileFormat format, PartitionSpec spec) {
+      return positionDeletesReader(inputFile, format, spec);
+    }
+
+    @Override
+    public PositionDeleteWriter<Record> writer(
+        OutputFile outputFile,
+        FileFormat format,
+        PartitionSpec spec,
+        StructLike partition,
+        Schema rowSchema)
+        throws IOException {
+      return positionDeletesWriter(outputFile, format, spec, partition, 
rowSchema);
+    }
+  }
+
+  private void rewritePositionDeletes(Set<DeleteFile> toRewrite) {
+    /*
+     * NOTE: Rewriting position delete files updates embedded data file paths, 
which changes the
+     * resulting file size. This causes a metadata mismatch in the manifests:
+     *
+     * 1. Dependency: Manifests MUST be rewritten first because they are the 
source of truth used to identify which 
+     *    position delete files exist and need processing.
+     * 2. Issue: Because manifests are written before the delete files are 
updated, the'file_size_in_bytes' field 
+     *    in the manifest reflects the original size, not the new size.
+     * 3. Impact: Some catalogs (e.g., REST catalogs like Polaris) will fail 
to read these files as the reader uses
+     *    the stale size from the manifest.
+     *
+     * This is a known Iceberg limitation being addressed by the Iceberg 
community. Once that fix is available
+     * in the Iceberg core, this action should be updated accordingly.
+     */
+    if (toRewrite.isEmpty()) {
+      return;
+    }
+    
+    RewriteTablePathUtil.PositionDeleteReaderWriter posDeleteReaderWriter = 
new OzonePositionDeleteReaderWriter();
+    int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+    Semaphore semaphore = new Semaphore(maxInFlight);
+    ExecutorCompletionService<Void> completionService = new 
ExecutorCompletionService<>(executorService);
+    int submittedTasks = 0;
+    int completedTasks = 0;
+
+    try {
+      for (DeleteFile deleteFile : toRewrite) {
+        semaphore.acquire();
+        boolean taskSubmitted = false;
+        try {
+          completionService.submit(() -> {
+            try {
+              rewritePositionDelete(deleteFile, table, sourcePrefix, 
targetPrefix, stagingDir, posDeleteReaderWriter);
+              return null;
+            } finally {
+              semaphore.release();
+            }
+          });
+          taskSubmitted = true;
+          submittedTasks++;
+        } finally {
+          if (!taskSubmitted) {
+            semaphore.release();
+          }
+        }

Review Comment:
   The Semaphore + ExecutorCompletionService pattern here is intentional, and 
here is the reasoning:
   
   `Executors.newFixedThreadPool` uses an unbounded `LinkedBlockingQueue` 
internally. While the thread count limits concurrent execution, all submitted 
tasks are still queued immediately. For a table with a very large number of 
position delete files, submitting everything upfront would create a task 
wrapper per file and can lead to memory pressure proportional to the number of 
files.
   
   The semaphore provides explicit backpressure and keeps the number of tasks 
admitted into the executor bounded at any point in time.
   
   The `ExecutorCompletionService` polling inside the submission loop allows us 
to opportunistically drain already completed tasks during submission. This 
helps surface task failures earlier during execution compared to waiting for a 
final drain phase after all submissions are completed.
   
   A bounded `ThreadPoolExecutor` with an `ArrayBlockingQueue` is an 
alternative, but it does not eliminate the need for backpressure control or 
result tracking. When the queue fills, behaviour depends on the rejection 
policy:
   
   * `CallerRunsPolicy` changes execution semantics by running tasks on the 
submitting thread, which can stall submission progress, this naturally slows 
down the rate of new task submissions.
   * custom RejectedExecutionHandler provides similar backpressure behavior to 
`Semaphore.acquire()`, but moves the control flow into executor internals 
rather than keeping it explicit at the call site.
   
   In both cases, `ExecutorCompletionService` is still required for result 
collection and error propagation. The semaphore keeps the backpressure and 
concurrency limits explicit and visible in the code, rather than implicit in 
executor configuration.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to