rdblue commented on a change in pull request #1098: URL: https://github.com/apache/iceberg/pull/1098#discussion_r437646185
########## File path: core/src/main/java/org/apache/iceberg/ManifestMergeManager.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.ManifestEntry.Status; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.util.BinPacking.ListPacker; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; + +abstract class ManifestMergeManager<F extends ContentFile<F>> { + private final long targetSizeBytes; + private final int minCountToMerge; + private final boolean mergeEnabled; + + // cache merge results to reuse when retrying + private final Map<List<ManifestFile>, ManifestFile> mergedManifests = Maps.newConcurrentMap(); + + ManifestMergeManager(long targetSizeBytes, int minCountToMerge, boolean mergeEnabled) { + this.targetSizeBytes = targetSizeBytes; + this.minCountToMerge = minCountToMerge; + this.mergeEnabled = mergeEnabled; + } + + protected abstract long snapshotId(); + protected abstract PartitionSpec spec(int specId); + protected abstract void deleteFile(String location); + protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec); + protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest); + + Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) { + Iterator<ManifestFile> manifestIter = manifests.iterator(); + if (!mergeEnabled || !manifestIter.hasNext()) { + return manifests; + } + + ManifestFile first = manifestIter.next(); + + List<ManifestFile> merged = Lists.newArrayList(); + ListMultimap<Integer, ManifestFile> groups = groupBySpec(first, manifestIter); + for (Integer specId : groups.keySet()) { + Iterables.addAll(merged, mergeGroup(first, specId, groups.get(specId))); + } + + return merged; + } + + void cleanUncommitted(Set<ManifestFile> committed) { + // iterate over a copy of entries to avoid concurrent modification + List<Map.Entry<List<ManifestFile>, ManifestFile>> entries = + Lists.newArrayList(mergedManifests.entrySet()); + + for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) { + // delete any new merged manifests that aren't in the committed list + ManifestFile merged = entry.getValue(); + if (!committed.contains(merged)) { + deleteFile(merged.path()); + // remove the deleted file from the cache + mergedManifests.remove(entry.getKey()); + } + } + } + + private ListMultimap<Integer, ManifestFile> groupBySpec(ManifestFile first, Iterator<ManifestFile> remaining) { + ListMultimap<Integer, ManifestFile> groups = Multimaps.newListMultimap( + Maps.newTreeMap(Comparator.<Integer>reverseOrder()), + Lists::newArrayList); + groups.put(first.partitionSpecId(), first); + remaining.forEachRemaining(manifest -> groups.put(manifest.partitionSpecId(), manifest)); + return groups; + } + + @SuppressWarnings("unchecked") + private Iterable<ManifestFile> mergeGroup(ManifestFile first, int specId, List<ManifestFile> group) { + // use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack + // from the end so that the manifest that gets under-filled is the first one, which will be + // merged the next time. + ListPacker<ManifestFile> packer = new ListPacker<>(targetSizeBytes, 1, false); + List<List<ManifestFile>> bins = packer.packEnd(group, ManifestFile::length); + + // process bins in parallel, but put results in the order of the bins into an array to preserve + // the order of manifests and contents. preserving the order helps avoid random deletes when + // data files are eventually aged off. + List<ManifestFile>[] binResults = (List<ManifestFile>[]) + Array.newInstance(List.class, bins.size()); + + Tasks.range(bins.size()) + .stopOnFailure().throwFailureWhenFinished() + .executeWith(ThreadPools.getWorkerPool()) + .run(index -> { + List<ManifestFile> bin = bins.get(index); + List<ManifestFile> outputManifests = Lists.newArrayList(); + binResults[index] = outputManifests; + + if (bin.size() == 1) { + // no need to rewrite + outputManifests.add(bin.get(0)); + return; + } + + // if the bin has the first manifest (the new data files or an appended manifest file) then only merge it + // if the number of manifests is above the minimum count. this is applied only to bins with an in-memory + // manifest so that large manifests don't prevent merging older groups. + if (bin.contains(first) && bin.size() < minCountToMerge) { Review comment: `m1` is used to detect the first bin, so it would not be merged because it only has 2 (< `minCount`) manifests. The other bin would be merged because it isn't the first. Overall, the behavior is the same as before. The difference is that now we can detect the first bin even if there are no appended manifests. In your example, that commit would end with `[m1, m2, m7]`, where `m7` is the merged bin. A delete commit after that would produce bins `[(m1, m2), (m7)]` but this time the first bin, `(m1, m2)` would get merged because there was no appended manifest used to detect that it was the first bin. Now, we use the first manifest, `m1`, to identify the first bin and it would not get merged. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
