sreejasahithi commented on code in PR #10306:
URL: https://github.com/apache/ozone/pull/10306#discussion_r3281240596
##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -675,4 +695,188 @@ private static RewriteResult<DeleteFile>
writeDeleteManifest(
throw new RuntimeIOException(e);
}
}
+
+ static class OzonePositionDeleteReaderWriter implements
RewriteTablePathUtil.PositionDeleteReaderWriter {
+ @Override
+ public CloseableIterable<Record> reader(
+ InputFile inputFile, FileFormat format, PartitionSpec spec) {
+ return positionDeletesReader(inputFile, format, spec);
+ }
+
+ @Override
+ public PositionDeleteWriter<Record> writer(
+ OutputFile outputFile,
+ FileFormat format,
+ PartitionSpec spec,
+ StructLike partition,
+ Schema rowSchema)
+ throws IOException {
+ return positionDeletesWriter(outputFile, format, spec, partition,
rowSchema);
+ }
+ }
+
+ private void rewritePositionDeletes(Set<DeleteFile> toRewrite) {
+ /*
+ * NOTE: Rewriting position delete files updates embedded data file paths,
which changes the
+ * resulting file size. This causes a metadata mismatch in the manifests:
+ *
+ * 1. Dependency: Manifests MUST be rewritten first because they are the
source of truth used to identify which
+ * position delete files exist and need processing.
+ * 2. Issue: Because manifests are written before the delete files are
updated, the'file_size_in_bytes' field
+ * in the manifest reflects the original size, not the new size.
+ * 3. Impact: Some catalogs (e.g., REST catalogs like Polaris) will fail
to read these files as the reader uses
+ * the stale size from the manifest.
+ *
+ * This is a known Iceberg limitation being addressed by the Iceberg
community. Once that fix is available
+ * in the Iceberg core, this action should be updated accordingly.
+ */
+ if (toRewrite.isEmpty()) {
+ return;
+ }
+
+ RewriteTablePathUtil.PositionDeleteReaderWriter posDeleteReaderWriter =
new OzonePositionDeleteReaderWriter();
+ int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+ Semaphore semaphore = new Semaphore(maxInFlight);
+ ExecutorCompletionService<Void> completionService = new
ExecutorCompletionService<>(executorService);
+ int submittedTasks = 0;
+ int completedTasks = 0;
+
+ try {
+ for (DeleteFile deleteFile : toRewrite) {
+ semaphore.acquire();
+ boolean taskSubmitted = false;
+ try {
+ completionService.submit(() -> {
+ try {
+ rewritePositionDelete(deleteFile, table, sourcePrefix,
targetPrefix, stagingDir, posDeleteReaderWriter);
+ return null;
+ } finally {
+ semaphore.release();
+ }
+ });
+ taskSubmitted = true;
+ submittedTasks++;
+ } finally {
+ if (!taskSubmitted) {
+ semaphore.release();
+ }
+ }
Review Comment:
The Semaphore + ExecutorCompletionService pattern here is intentional, and
here is the reasoning:
`Executors.newFixedThreadPool` uses an unbounded `LinkedBlockingQueue`
internally. While the thread count limits concurrent execution, all submitted
tasks are still queued immediately. For a table with a very large number of
position delete files, submitting everything upfront would create a task
wrapper per file and can lead to significant memory pressure.
The semaphore provides explicit backpressure and keeps the number of tasks
admitted into the executor bounded at any point in time.
The `ExecutorCompletionService` polling inside the submission loop allows us
to observe task completion and detect failures early, so we can stop submitting
additional work instead of enqueueing the full workload before encountering an
error during the drain phase.
A bounded `ThreadPoolExecutor` with an `ArrayBlockingQueue` is an
alternative, but it does not eliminate the need for backpressure control or
result tracking. When the queue fills, behaviour depends on the rejection
policy:
* `CallerRunsPolicy` changes execution semantics by running tasks on the
submitting thread, which can stall submission progress, this naturally slows
down the rate of new task submissions.
* custom RejectedExecutionHandler provides similar backpressure behavior to
`Semaphore.acquire()`, but moves the control flow into executor internals
rather than keeping it explicit at the call site.
In both cases, `ExecutorCompletionService` is still required for result
collection and error propagation. The semaphore keeps the backpressure and
concurrency limits explicit and visible in the code, rather than implicit in
executor configuration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]