This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new be577eeac6 Core: Maintain passed in ordering of files in Manifest
Lists (#13411)
be577eeac6 is described below
commit be577eeac631d77243beb57409e476bf197f79d7
Author: Russell Spitzer <[email protected]>
AuthorDate: Tue Jul 1 12:44:34 2025 -0500
Core: Maintain passed in ordering of files in Manifest Lists (#13411)
---
.../java/org/apache/iceberg/AppendBenchmark.java | 1 +
.../java/org/apache/iceberg/SnapshotProducer.java | 32 ++++++++++++++++++----
jmh.gradle | 1 +
3 files changed, 28 insertions(+), 6 deletions(-)
diff --git a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
index a444e7ff9c..57a5c2d3a1 100644
--- a/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/AppendBenchmark.java
@@ -86,6 +86,7 @@ public class AppendBenchmark {
@Setup
public void setupBenchmark() {
+ dropTable();
initTable();
initDataFiles();
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 118ae0b328..77cdac8f4a 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -40,12 +40,12 @@ import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.iceberg.encryption.EncryptedOutputFile;
@@ -69,10 +69,10 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.util.Exceptions;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
@@ -669,13 +669,33 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
Collection<F> files, Function<List<F>, List<ManifestFile>> writeFunc) {
int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE,
files.size());
List<List<F>> groups = divide(files, parallelism);
- Queue<ManifestFile> manifests = Queues.newConcurrentLinkedQueue();
- Tasks.foreach(groups)
+
+ // Create a new list pairing each group with its index
+ List<Pair<Integer, List<F>>> groupsWithIndex = Lists.newArrayList();
+ for (int i = 0; i < groups.size(); i++) {
+ groupsWithIndex.add(Pair.of(i, groups.get(i)));
+ }
+
+ AtomicReferenceArray<List<ManifestFile>> results = new
AtomicReferenceArray<>(groups.size());
+
+ Tasks.foreach(groupsWithIndex)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
- .run(group -> manifests.addAll(writeFunc.apply(group)));
- return ImmutableList.copyOf(manifests);
+ .run(
+ indexedGroup -> {
+ int index = indexedGroup.first();
+ List<F> group = indexedGroup.second();
+ List<ManifestFile> groupResults = writeFunc.apply(group);
+ results.set(index, groupResults);
+ });
+
+ // Collect results in order
+ ImmutableList.Builder<ManifestFile> builder = ImmutableList.builder();
+ for (int i = 0; i < results.length(); i++) {
+ builder.addAll(results.get(i));
+ }
+ return builder.build();
}
private static <T> List<List<T>> divide(Collection<T> collection, int
groupCount) {
diff --git a/jmh.gradle b/jmh.gradle
index 4d6d7207c5..7f3bc0deaf 100644
--- a/jmh.gradle
+++ b/jmh.gradle
@@ -66,6 +66,7 @@ configure(jmhProjects) {
forceGC = true
includeTests = true
humanOutputFile = file(jmhOutputPath)
+ jvmArgs = ['-Xmx32g']
resultsFile = file(jmhJsonOutputPath)
resultFormat = 'JSON'
includes = [jmhIncludeRegex]