chenjunjiedada commented on a change in pull request #1098:
URL: https://github.com/apache/iceberg/pull/1098#discussion_r437173289



##########
File path: core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.util.BinPacking.ListPacker;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+
+abstract class ManifestMergeManager<F extends ContentFile<F>> {
+  private final long targetSizeBytes;
+  private final int minCountToMerge;
+  private final boolean mergeEnabled;
+
+  // cache merge results to reuse when retrying
+  private final Map<List<ManifestFile>, ManifestFile> mergedManifests = 
Maps.newConcurrentMap();
+
+  ManifestMergeManager(long targetSizeBytes, int minCountToMerge, boolean 
mergeEnabled) {
+    this.targetSizeBytes = targetSizeBytes;
+    this.minCountToMerge = minCountToMerge;
+    this.mergeEnabled = mergeEnabled;
+  }
+
+  protected abstract long snapshotId();
+  protected abstract PartitionSpec spec(int specId);
+  protected abstract void deleteFile(String location);
+  protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
+  protected abstract ManifestReader<F> newManifestReader(ManifestFile 
manifest);
+
+  Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
+    Iterator<ManifestFile> manifestIter = manifests.iterator();
+    if (!mergeEnabled || !manifestIter.hasNext()) {
+      return manifests;
+    }
+
+    ManifestFile first = manifestIter.next();
+
+    List<ManifestFile> merged = Lists.newArrayList();
+    ListMultimap<Integer, ManifestFile> groups = groupBySpec(first, 
manifestIter);
+    for (Integer specId : groups.keySet()) {
+      Iterables.addAll(merged, mergeGroup(first, specId, groups.get(specId)));
+    }
+
+    return merged;
+  }
+
+  void cleanUncommitted(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
+        Lists.newArrayList(mergedManifests.entrySet());
+
+    for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
+      // delete any new merged manifests that aren't in the committed list
+      ManifestFile merged = entry.getValue();
+      if (!committed.contains(merged)) {
+        deleteFile(merged.path());
+        // remove the deleted file from the cache
+        mergedManifests.remove(entry.getKey());
+      }
+    }
+  }
+
+  private ListMultimap<Integer, ManifestFile> groupBySpec(ManifestFile first, 
Iterator<ManifestFile> remaining) {
+    ListMultimap<Integer, ManifestFile> groups = Multimaps.newListMultimap(
+        Maps.newTreeMap(Comparator.<Integer>reverseOrder()),
+        Lists::newArrayList);
+    groups.put(first.partitionSpecId(), first);
+    remaining.forEachRemaining(manifest -> 
groups.put(manifest.partitionSpecId(), manifest));
+    return groups;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Iterable<ManifestFile> mergeGroup(ManifestFile first, int specId, 
List<ManifestFile> group) {
+    // use a lookback of 1 to avoid reordering the manifests. using 1 also 
means this should pack
+    // from the end so that the manifest that gets under-filled is the first 
one, which will be
+    // merged the next time.
+    ListPacker<ManifestFile> packer = new ListPacker<>(targetSizeBytes, 1, 
false);
+    List<List<ManifestFile>> bins = packer.packEnd(group, 
ManifestFile::length);
+
+    // process bins in parallel, but put results in the order of the bins into 
an array to preserve
+    // the order of manifests and contents. preserving the order helps avoid 
random deletes when
+    // data files are eventually aged off.
+    List<ManifestFile>[] binResults = (List<ManifestFile>[])
+        Array.newInstance(List.class, bins.size());
+
+    Tasks.range(bins.size())
+        .stopOnFailure().throwFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .run(index -> {
+          List<ManifestFile> bin = bins.get(index);
+          List<ManifestFile> outputManifests = Lists.newArrayList();
+          binResults[index] = outputManifests;
+
+          if (bin.size() == 1) {
+            // no need to rewrite
+            outputManifests.add(bin.get(0));
+            return;
+          }
+
+          // if the bin has the first manifest (the new data files or an 
appended manifest file) then only merge it
+          // if the number of manifests is above the minimum count. this is 
applied only to bins with an in-memory
+          // manifest so that large manifests don't prevent merging older 
groups.
+          if (bin.contains(first) && bin.size() < minCountToMerge) {

Review comment:
       Could you please help me to understand the following case?
   
   If we append 5 manifests on a table that has 1 existing manifests, and they 
are packed into 2 bins? Assume we set minCountToMerge to 4.
   
   [(m1, m2, m3, m4), (m5, m6)], m1-m5 are appended manifests, m6 is an 
existing table manifest.
   
   The `first` is m1, right?  then both bins are getting merged?

##########
File path: core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.util.BinPacking.ListPacker;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+
+abstract class ManifestMergeManager<F extends ContentFile<F>> {
+  private final long targetSizeBytes;
+  private final int minCountToMerge;
+  private final boolean mergeEnabled;
+
+  // cache merge results to reuse when retrying
+  private final Map<List<ManifestFile>, ManifestFile> mergedManifests = 
Maps.newConcurrentMap();
+
+  ManifestMergeManager(long targetSizeBytes, int minCountToMerge, boolean 
mergeEnabled) {
+    this.targetSizeBytes = targetSizeBytes;
+    this.minCountToMerge = minCountToMerge;
+    this.mergeEnabled = mergeEnabled;
+  }
+
+  protected abstract long snapshotId();
+  protected abstract PartitionSpec spec(int specId);
+  protected abstract void deleteFile(String location);
+  protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
+  protected abstract ManifestReader<F> newManifestReader(ManifestFile 
manifest);
+
+  Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
+    Iterator<ManifestFile> manifestIter = manifests.iterator();
+    if (!mergeEnabled || !manifestIter.hasNext()) {
+      return manifests;
+    }
+
+    ManifestFile first = manifestIter.next();
+
+    List<ManifestFile> merged = Lists.newArrayList();
+    ListMultimap<Integer, ManifestFile> groups = groupBySpec(first, 
manifestIter);
+    for (Integer specId : groups.keySet()) {
+      Iterables.addAll(merged, mergeGroup(first, specId, groups.get(specId)));
+    }
+
+    return merged;
+  }
+
+  void cleanUncommitted(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
+        Lists.newArrayList(mergedManifests.entrySet());
+
+    for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
+      // delete any new merged manifests that aren't in the committed list
+      ManifestFile merged = entry.getValue();
+      if (!committed.contains(merged)) {
+        deleteFile(merged.path());
+        // remove the deleted file from the cache
+        mergedManifests.remove(entry.getKey());
+      }
+    }
+  }
+
+  private ListMultimap<Integer, ManifestFile> groupBySpec(ManifestFile first, 
Iterator<ManifestFile> remaining) {
+    ListMultimap<Integer, ManifestFile> groups = Multimaps.newListMultimap(
+        Maps.newTreeMap(Comparator.<Integer>reverseOrder()),
+        Lists::newArrayList);
+    groups.put(first.partitionSpecId(), first);
+    remaining.forEachRemaining(manifest -> 
groups.put(manifest.partitionSpecId(), manifest));
+    return groups;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Iterable<ManifestFile> mergeGroup(ManifestFile first, int specId, 
List<ManifestFile> group) {
+    // use a lookback of 1 to avoid reordering the manifests. using 1 also 
means this should pack
+    // from the end so that the manifest that gets under-filled is the first 
one, which will be
+    // merged the next time.
+    ListPacker<ManifestFile> packer = new ListPacker<>(targetSizeBytes, 1, 
false);
+    List<List<ManifestFile>> bins = packer.packEnd(group, 
ManifestFile::length);
+
+    // process bins in parallel, but put results in the order of the bins into 
an array to preserve
+    // the order of manifests and contents. preserving the order helps avoid 
random deletes when
+    // data files are eventually aged off.
+    List<ManifestFile>[] binResults = (List<ManifestFile>[])
+        Array.newInstance(List.class, bins.size());
+
+    Tasks.range(bins.size())
+        .stopOnFailure().throwFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .run(index -> {
+          List<ManifestFile> bin = bins.get(index);
+          List<ManifestFile> outputManifests = Lists.newArrayList();
+          binResults[index] = outputManifests;
+
+          if (bin.size() == 1) {
+            // no need to rewrite
+            outputManifests.add(bin.get(0));
+            return;
+          }
+
+          // if the bin has the first manifest (the new data files or an 
appended manifest file) then only merge it
+          // if the number of manifests is above the minimum count. this is 
applied only to bins with an in-memory
+          // manifest so that large manifests don't prevent merging older 
groups.
+          if (bin.contains(first) && bin.size() < minCountToMerge) {

Review comment:
       Could you please help me to understand the following case?
   
   If we append 5 manifests on a table that has 1 existing manifests, and they 
are packed into 2 bins? Assume we set minCountToMerge to 4. m1-m5 are appended 
manifests, m6 is an existing table manifest
   
   Since it packs from the end, we must have a layout: [(m1, m2), (m3, m4, m5, 
m6)]
   
   The `first` is m1, right?  then the first bin will not get merged while 
m3-m5 are merged.

##########
File path: core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.util.BinPacking.ListPacker;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+
+abstract class ManifestMergeManager<F extends ContentFile<F>> {
+  private final long targetSizeBytes;
+  private final int minCountToMerge;
+  private final boolean mergeEnabled;
+
+  // cache merge results to reuse when retrying
+  private final Map<List<ManifestFile>, ManifestFile> mergedManifests = 
Maps.newConcurrentMap();
+
+  ManifestMergeManager(long targetSizeBytes, int minCountToMerge, boolean 
mergeEnabled) {
+    this.targetSizeBytes = targetSizeBytes;
+    this.minCountToMerge = minCountToMerge;
+    this.mergeEnabled = mergeEnabled;
+  }
+
+  protected abstract long snapshotId();
+  protected abstract PartitionSpec spec(int specId);
+  protected abstract void deleteFile(String location);
+  protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
+  protected abstract ManifestReader<F> newManifestReader(ManifestFile 
manifest);
+
+  Iterable<ManifestFile> mergeManifests(Iterable<ManifestFile> manifests) {
+    Iterator<ManifestFile> manifestIter = manifests.iterator();
+    if (!mergeEnabled || !manifestIter.hasNext()) {
+      return manifests;
+    }
+
+    ManifestFile first = manifestIter.next();
+
+    List<ManifestFile> merged = Lists.newArrayList();
+    ListMultimap<Integer, ManifestFile> groups = groupBySpec(first, 
manifestIter);
+    for (Integer specId : groups.keySet()) {
+      Iterables.addAll(merged, mergeGroup(first, specId, groups.get(specId)));
+    }
+
+    return merged;
+  }
+
+  void cleanUncommitted(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
+        Lists.newArrayList(mergedManifests.entrySet());
+
+    for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
+      // delete any new merged manifests that aren't in the committed list
+      ManifestFile merged = entry.getValue();
+      if (!committed.contains(merged)) {
+        deleteFile(merged.path());
+        // remove the deleted file from the cache
+        mergedManifests.remove(entry.getKey());
+      }
+    }
+  }
+
+  private ListMultimap<Integer, ManifestFile> groupBySpec(ManifestFile first, 
Iterator<ManifestFile> remaining) {
+    ListMultimap<Integer, ManifestFile> groups = Multimaps.newListMultimap(
+        Maps.newTreeMap(Comparator.<Integer>reverseOrder()),
+        Lists::newArrayList);
+    groups.put(first.partitionSpecId(), first);
+    remaining.forEachRemaining(manifest -> 
groups.put(manifest.partitionSpecId(), manifest));
+    return groups;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Iterable<ManifestFile> mergeGroup(ManifestFile first, int specId, 
List<ManifestFile> group) {
+    // use a lookback of 1 to avoid reordering the manifests. using 1 also 
means this should pack
+    // from the end so that the manifest that gets under-filled is the first 
one, which will be
+    // merged the next time.
+    ListPacker<ManifestFile> packer = new ListPacker<>(targetSizeBytes, 1, 
false);
+    List<List<ManifestFile>> bins = packer.packEnd(group, 
ManifestFile::length);
+
+    // process bins in parallel, but put results in the order of the bins into 
an array to preserve
+    // the order of manifests and contents. preserving the order helps avoid 
random deletes when
+    // data files are eventually aged off.
+    List<ManifestFile>[] binResults = (List<ManifestFile>[])
+        Array.newInstance(List.class, bins.size());
+
+    Tasks.range(bins.size())
+        .stopOnFailure().throwFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .run(index -> {
+          List<ManifestFile> bin = bins.get(index);
+          List<ManifestFile> outputManifests = Lists.newArrayList();
+          binResults[index] = outputManifests;
+
+          if (bin.size() == 1) {
+            // no need to rewrite
+            outputManifests.add(bin.get(0));
+            return;
+          }
+
+          // if the bin has the first manifest (the new data files or an 
appended manifest file) then only merge it
+          // if the number of manifests is above the minimum count. this is 
applied only to bins with an in-memory
+          // manifest so that large manifests don't prevent merging older 
groups.
+          if (bin.contains(first) && bin.size() < minCountToMerge) {

Review comment:
       Could you please help me to understand the following case?
   
   If we append 5 manifests (`m1-m5`) on a table that has 1 existing 
manifest(`m6`), and they are packed into 2 bins according to `targetWeight`. 
Assume we set `minCountToMerge` to 4.
   
   Since it packs from the end, we must have a layout: [(m1, m2), (m3, m4, m5, 
m6)]
   
   The `first` is m1, right?  then the first bin will not get merged while 
m3-m5 are merged.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to