This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9245abd Add RewriteManifests operation (#200)
9245abd is described below
commit 9245abd248fb4e37b6eca39aca8bfab044d815fa
Author: bryanck <[email protected]>
AuthorDate: Sat Jun 8 14:04:14 2019 -0700
Add RewriteManifests operation (#200)
This operation can select manifests to rewrite and cluster the entries of
those manifests to make scan planning more efficient.
---
.../java/org/apache/iceberg/RewriteManifests.java | 57 ++++
api/src/main/java/org/apache/iceberg/Table.java | 8 +
.../main/java/org/apache/iceberg/Transaction.java | 7 +
.../main/java/org/apache/iceberg/BaseTable.java | 5 +
.../java/org/apache/iceberg/BaseTransaction.java | 13 +
.../java/org/apache/iceberg/ReplaceManifests.java | 261 ++++++++++++++++
.../org/apache/iceberg/TestReplaceManifests.java | 332 +++++++++++++++++++++
7 files changed, 683 insertions(+)
diff --git a/api/src/main/java/org/apache/iceberg/RewriteManifests.java
b/api/src/main/java/org/apache/iceberg/RewriteManifests.java
new file mode 100644
index 0000000..1ed1ddf
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/RewriteManifests.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+/**
+ * API for rewriting manifests for a table.
+ * <p>
+ * This API accumulates manifest files, produces a new {@link Snapshot} of the
table
+ * described only by the manifest files that were added, and commits that
snapshot as the
+ * current.
+ * <p>
+ * When committing, these changes will be applied to the latest table
snapshot. Commit conflicts
+ * will be resolved by applying the changes to the new latest snapshot and
reattempting the commit.
+ */
+public interface RewriteManifests extends SnapshotUpdate<RewriteManifests> {
+ /**
+ * Groups an existing {@link DataFile} by a cluster key produced by a
function. The cluster key
+ * will determine which data file will be associated with a particular
manifest. All data files
+ * with the same cluster key will be written to the same manifest (unless
the file is large and
+ * split into multiple files).
+ *
+ * @param func Function used to cluster data files to manifests.
+ * @return this for method chaining
+ */
+ RewriteManifests clusterBy(Function<DataFile, Object> func);
+
+ /**
+ * Determines which existing {@link ManifestFile} for the table should be
rewritten. Manifests
+ * that do not match the predicate are kept as-is. If this is not called and
no predicate is set, then
+ * all manifests will be rewritten.
+ *
+ * @param predicate Predicate used to determine which manifests to rewrite.
If true then the manifest
+ * file will be included for rewrite. If false then then
manifest is kept as-is.
+ * @return this for method chaining
+ */
+ RewriteManifests rewriteIf(Predicate<ManifestFile> predicate);
+}
diff --git a/api/src/main/java/org/apache/iceberg/Table.java
b/api/src/main/java/org/apache/iceberg/Table.java
index d0b5821..e6390cc 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -137,6 +137,14 @@ public interface Table {
RewriteFiles newRewrite();
/**
+ * Create a new {@link RewriteManifests rewrite manifests API} to replace
manifests for this
+ * table and commit.
+ *
+ * @return a new {@link RewriteManifests}
+ */
+ RewriteManifests rewriteManifests();
+
+ /**
* Create a new {@link OverwriteFiles overwrite API} to overwrite files by a
filter expression.
*
* @return a new {@link OverwriteFiles}
diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java
b/api/src/main/java/org/apache/iceberg/Transaction.java
index d2e6320..f5b9f84 100644
--- a/api/src/main/java/org/apache/iceberg/Transaction.java
+++ b/api/src/main/java/org/apache/iceberg/Transaction.java
@@ -85,6 +85,13 @@ public interface Transaction {
RewriteFiles newRewrite();
/**
+ * Create a new {@link RewriteManifests rewrite manifests API} to replace
manifests for this table.
+ *
+ * @return a new {@link RewriteManifests}
+ */
+ RewriteManifests rewriteManifests();
+
+ /**
* Create a new {@link OverwriteFiles overwrite API} to overwrite files by a
filter expression.
*
* @return a new {@link OverwriteFiles}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 4fb69c9..9e61e1c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -114,6 +114,11 @@ public class BaseTable implements Table,
HasTableOperations {
}
@Override
+ public RewriteManifests rewriteManifests() {
+ return new ReplaceManifests(ops);
+ }
+
+ @Override
public OverwriteFiles newOverwrite() {
return new OverwriteData(ops);
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index b3ec03f..d16d143 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -143,6 +143,14 @@ class BaseTransaction implements Transaction {
}
@Override
+ public RewriteManifests rewriteManifests() {
+ checkLastOperationCommitted("RewriteManifests");
+ RewriteManifests rewrite = new ReplaceManifests(transactionOps);
+ updates.add(rewrite);
+ return rewrite;
+ }
+
+ @Override
public OverwriteFiles newOverwrite() {
checkLastOperationCommitted("OverwriteFiles");
OverwriteFiles overwrite = new OverwriteData(transactionOps);
@@ -372,6 +380,11 @@ class BaseTransaction implements Transaction {
}
@Override
+ public RewriteManifests rewriteManifests() {
+ return BaseTransaction.this.rewriteManifests();
+ }
+
+ @Override
public OverwriteFiles newOverwrite() {
return BaseTransaction.this.newOverwrite();
}
diff --git a/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
new file mode 100644
index 0000000..0077070
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ReplaceManifests.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT;
+
+
+public class ReplaceManifests extends SnapshotProducer<RewriteManifests>
implements RewriteManifests {
+ private final TableOperations ops;
+ private final PartitionSpec spec;
+ private final long manifestTargetSizeBytes;
+
+ private final List<ManifestFile> keptManifests =
Collections.synchronizedList(new ArrayList<>());
+ private final List<ManifestFile> newManifests =
Collections.synchronizedList(new ArrayList<>());
+ private final Set<ManifestFile> replacedManifests =
Collections.synchronizedSet(new HashSet<>());
+ private final Map<Object, WriterWrapper> writers =
Collections.synchronizedMap(new HashMap<>());
+
+ private final AtomicInteger manifestCount = new AtomicInteger(0);
+ private final AtomicLong entryCount = new AtomicLong(0);
+
+ private final Map<String, String> summaryProps = new HashMap<>();
+
+ private Function<DataFile, Object> clusterByFunc;
+ private Predicate<ManifestFile> predicate;
+
+ private static final String REPLACED_CNT = "manifests-replaced";
+ private static final String KEPT_CNT = "manifests-kept";
+ private static final String NEW_CNT = "manifests-created";
+ private static final String ENTRY_CNT = "entries-processed";
+
+ ReplaceManifests(TableOperations ops) {
+ super(ops);
+ this.ops = ops;
+ this.spec = ops.current().spec();
+ this.manifestTargetSizeBytes =
+ ops.current().propertyAsLong(MANIFEST_TARGET_SIZE_BYTES,
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
+ }
+
+ @Override
+ protected String operation() {
+ return DataOperations.REPLACE;
+ }
+
+ @Override
+ public RewriteManifests set(String property, String value) {
+ summaryProps.put(property, value);
+ return this;
+ }
+
+ @Override
+ protected Map<String, String> summary() {
+ Map<String, String> result = new HashMap<>();
+ result.putAll(summaryProps);
+ result.put(KEPT_CNT, Integer.toString(keptManifests.size()));
+ result.put(NEW_CNT, Integer.toString(newManifests.size()));
+ result.put(REPLACED_CNT, Integer.toString(replacedManifests.size()));
+ result.put(ENTRY_CNT, Long.toString(entryCount.get()));
+ return result;
+ }
+
+ @Override
+ public ReplaceManifests clusterBy(Function<DataFile, Object> func) {
+ this.clusterByFunc = func;
+ return this;
+ }
+
+ @Override
+ public ReplaceManifests rewriteIf(Predicate<ManifestFile> pred) {
+ this.predicate = pred;
+ return this;
+ }
+
+ @Override
+ public List<ManifestFile> apply(TableMetadata base) {
+ Preconditions.checkNotNull(clusterByFunc, "clusterBy function cannot be
null");
+
+ List<ManifestFile> currentManifests = base.currentSnapshot().manifests();
+
+ if (requiresRewrite(currentManifests)) {
+ // run the rewrite process
+ performRewrite(currentManifests);
+ } else {
+ // just keep any new manifests that were added since the last apply(),
don't rerun
+ addExistingFromNewCommit(currentManifests);
+ }
+
+ // put new manifests at the beginning
+ List<ManifestFile> apply = new ArrayList<>();
+ apply.addAll(newManifests);
+ apply.addAll(keptManifests);
+
+ return apply;
+ }
+
+ private boolean requiresRewrite(List<ManifestFile> currentManifests) {
+ if (replacedManifests.size() == 0) {
+ // nothing yet processed so perform a full rewrite
+ return true;
+ }
+ // if any processed manifest is not in the current manifest list, perform
a full rewrite
+ Set<ManifestFile> set = Sets.newHashSet(currentManifests);
+ return replacedManifests.stream().anyMatch(manifest ->
!set.contains(manifest));
+ }
+
+ private void addExistingFromNewCommit(List<ManifestFile> currentManifests) {
+ // keep any existing manifests as-is that were not processed
+ keptManifests.clear();
+ currentManifests.stream()
+ .filter(manifest -> !replacedManifests.contains(manifest))
+ .forEach(manifest -> keptManifests.add(manifest));
+ }
+
+ private void reset() {
+ cleanAll();
+ entryCount.set(0);
+ manifestCount.set(0);
+ keptManifests.clear();
+ replacedManifests.clear();
+ newManifests.clear();
+ writers.clear();
+ }
+
+ private void performRewrite(List<ManifestFile> currentManifests) {
+ reset();
+
+ try {
+ Tasks.foreach(currentManifests)
+ .executeWith(ThreadPools.getWorkerPool())
+ .run(manifest -> {
+ if (predicate != null && !predicate.test(manifest)) {
+ keptManifests.add(manifest);
+ } else {
+ replacedManifests.add(manifest);
+ long entryNum = manifest.addedFilesCount() +
manifest.existingFilesCount() + manifest.deletedFilesCount();
+ long avgEntryLen = manifest.length() / entryNum;
+
+ try (ManifestReader reader =
+
ManifestReader.read(ops.io().newInputFile(manifest.path()),
ops.current()::spec)) {
+ FilteredManifest filteredManifest =
reader.select(Arrays.asList("*"));
+ filteredManifest.liveEntries().forEach(
+ entry -> appendEntry(entry, avgEntryLen,
clusterByFunc.apply(entry.file()))
+ );
+
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ }
+ });
+ } finally {
+
Tasks.foreach(writers.values()).executeWith(ThreadPools.getWorkerPool()).run(writer
-> writer.close());
+ }
+ }
+
+ private void appendEntry(ManifestEntry entry, long avgEntryLen, Object key) {
+ Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
+ Preconditions.checkNotNull(key, "Key cannot be null");
+
+ WriterWrapper writer = getWriter(key);
+ writer.addEntry(entry, avgEntryLen);
+ entryCount.incrementAndGet();
+ }
+
+ private WriterWrapper getWriter(Object key) {
+ WriterWrapper writer = writers.get(key);
+ if (writer == null) {
+ synchronized (writers) {
+ writer = writers.get(key); // check again after getting lock
+ if (writer == null) {
+ writer = new WriterWrapper();
+ writers.put(key, writer);
+ }
+ }
+ }
+ return writer;
+ }
+
+ @Override
+ protected void cleanUncommitted(Set<ManifestFile> committed) {
+ for (ManifestFile manifest : newManifests) {
+ if (!committed.contains(manifest)) {
+ deleteFile(manifest.path());
+ }
+ }
+ }
+
+ long getManifestTargetSizeBytes() {
+ return manifestTargetSizeBytes;
+ }
+
+ class WriterWrapper {
+ private ManifestWriter writer;
+ private long estimatedSize;
+
+ synchronized void addEntry(ManifestEntry entry, long len) {
+ if (writer == null) {
+ writer = newWriter();
+ } else if (estimatedSize >= getManifestTargetSizeBytes()) {
+ close();
+ writer = newWriter();
+ }
+
+ writer.addExisting(entry);
+ estimatedSize += len;
+ }
+
+ private ManifestWriter newWriter() {
+ estimatedSize = 0;
+ return new ManifestWriter(spec,
manifestPath(manifestCount.getAndIncrement()), snapshotId());
+ }
+
+ synchronized void close() {
+ if (writer != null) {
+ try {
+ writer.close();
+ newManifests.add(writer.toManifestFile());
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ }
+ }
+
+ }
+
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceManifests.java
b/core/src/test/java/org/apache/iceberg/TestReplaceManifests.java
new file mode 100644
index 0000000..7281916
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestReplaceManifests.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestReplaceManifests extends TableTestBase {
+
+ @Test
+ public void testReplaceManifestsSeparate() {
+ Table table = load();
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+ long appendId = table.currentSnapshot().snapshotId();
+
+ Assert.assertEquals(1, table.currentSnapshot().manifests().size());
+
+ // cluster by path will split the manifest into two
+
+ table.rewriteManifests()
+ .clusterBy(file -> file.path())
+ .commit();
+
+ List<ManifestFile> manifests = table.currentSnapshot().manifests();
+ Assert.assertEquals(2, manifests.size());
+ manifests.sort(Comparator.comparing(ManifestFile::path));
+
+ validateManifestEntries(manifests.get(0),
+ ids(appendId),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateManifestEntries(manifests.get(1),
+ ids(appendId),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testReplaceManifestsConsolidate() throws IOException {
+ Table table = load();
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+ long appendIdA = table.currentSnapshot().snapshotId();
+ table.newFastAppend()
+ .appendFile(FILE_B)
+ .commit();
+ long appendIdB = table.currentSnapshot().snapshotId();
+
+ Assert.assertEquals(2, table.currentSnapshot().manifests().size());
+
+ // cluster by constant will combine manifests into one
+
+ table.rewriteManifests()
+ .clusterBy(file -> "file")
+ .commit();
+
+ List<ManifestFile> manifests = table.currentSnapshot().manifests();
+ Assert.assertEquals(1, manifests.size());
+
+ // get the file order correct
+ List<DataFile> files;
+ List<Long> ids;
+ try (ManifestReader reader =
ManifestReader.read(localInput(manifests.get(0).path()))) {
+ if (reader.iterator().next().path().equals(FILE_A.path())) {
+ files = Arrays.asList(FILE_A, FILE_B);
+ ids = Arrays.asList(appendIdA, appendIdB);
+ } else {
+ files = Arrays.asList(FILE_B, FILE_A);
+ ids = Arrays.asList(appendIdB, appendIdA);
+ }
+ }
+
+ validateManifestEntries(manifests.get(0),
+ ids.iterator(),
+ files.iterator(),
+ statuses(ManifestEntry.Status.EXISTING,
ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testReplaceManifestsWithFilter() throws IOException {
+ Table table = load();
+
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+ long appendIdA = table.currentSnapshot().snapshotId();
+
+ table.newFastAppend()
+ .appendFile(FILE_B)
+ .commit();
+ long appendIdB = table.currentSnapshot().snapshotId();
+
+ table.newFastAppend()
+ .appendFile(FILE_C)
+ .commit();
+ long appendIdC = table.currentSnapshot().snapshotId();
+
+ Assert.assertEquals(3, table.currentSnapshot().manifests().size());
+
+ //keep the file A manifest, combine the other two
+
+ table.rewriteManifests()
+ .clusterBy(file -> "file")
+ .rewriteIf(manifest -> {
+ try (ManifestReader reader =
ManifestReader.read(localInput(manifest.path()))) {
+ return !reader.iterator().next().path().equals(FILE_A.path());
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ })
+ .commit();
+
+ List<ManifestFile> manifests = table.currentSnapshot().manifests();
+ Assert.assertEquals(2, manifests.size());
+
+ // get the file order correct
+ List<DataFile> files;
+ List<Long> ids;
+ try (ManifestReader reader =
ManifestReader.read(localInput(manifests.get(0).path()))) {
+ if (reader.iterator().next().path().equals(FILE_B.path())) {
+ files = Arrays.asList(FILE_B, FILE_C);
+ ids = Arrays.asList(appendIdB, appendIdC);
+ } else {
+ files = Arrays.asList(FILE_C, FILE_B);
+ ids = Arrays.asList(appendIdC, appendIdB);
+ }
+ }
+
+ validateManifestEntries(manifests.get(0),
+ ids.iterator(),
+ files.iterator(),
+ statuses(ManifestEntry.Status.EXISTING,
ManifestEntry.Status.EXISTING));
+ validateManifestEntries(manifests.get(1),
+ ids(appendIdA),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.ADDED));
+ }
+
+ @Test
+ public void testReplaceManifestsMaxSize() {
+ Table table = load();
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+ long appendId = table.currentSnapshot().snapshotId();
+
+ Assert.assertEquals(1, table.currentSnapshot().manifests().size());
+
+ // cluster by constant will combine manifests into one but small target
size will create one per entry
+ ReplaceManifests rewriteManifests = spy((ReplaceManifests)
table.rewriteManifests());
+ when(rewriteManifests.getManifestTargetSizeBytes()).thenReturn(1L);
+ rewriteManifests.clusterBy(file -> "file").commit();
+
+ List<ManifestFile> manifests = table.currentSnapshot().manifests();
+ Assert.assertEquals(2, manifests.size());
+ manifests.sort(Comparator.comparing(ManifestFile::path));
+
+ validateManifestEntries(manifests.get(0),
+ ids(appendId),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateManifestEntries(manifests.get(1),
+ ids(appendId),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testConcurrentRewriteManifest() throws IOException {
+ Table table = load();
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+ long appendIdA = table.currentSnapshot().snapshotId();
+ table.newFastAppend()
+ .appendFile(FILE_B)
+ .commit();
+ long appendIdB = table.currentSnapshot().snapshotId();
+
+ // start a rewrite manifests that involves both manifests
+ RewriteManifests rewrite = table.rewriteManifests();
+ rewrite.clusterBy(file -> "file").apply();
+
+ // commit a rewrite manifests that only involves one manifest
+ table.rewriteManifests()
+ .clusterBy(file -> "file")
+ .rewriteIf(manifest -> {
+ try (ManifestReader reader =
ManifestReader.read(localInput(manifest.path()))) {
+ return !reader.iterator().next().path().equals(FILE_A.path());
+ } catch (IOException x) {
+ throw new RuntimeIOException(x);
+ }
+ })
+ .commit();
+
+ Assert.assertEquals(2, table.currentSnapshot().manifests().size());
+
+ // commit the rewrite manifests in progress - this should perform a full
rewrite as the manifest
+ // with file B is no longer part of the snapshot
+ rewrite.commit();
+
+ List<ManifestFile> manifests = table.currentSnapshot().manifests();
+ Assert.assertEquals(1, manifests.size());
+
+ // get the file order correct
+ List<DataFile> files;
+ List<Long> ids;
+ try (ManifestReader reader =
ManifestReader.read(localInput(manifests.get(0).path()))) {
+ if (reader.iterator().next().path().equals(FILE_A.path())) {
+ files = Arrays.asList(FILE_A, FILE_B);
+ ids = Arrays.asList(appendIdA, appendIdB);
+ } else {
+ files = Arrays.asList(FILE_B, FILE_A);
+ ids = Arrays.asList(appendIdB, appendIdA);
+ }
+ }
+
+ validateManifestEntries(manifests.get(0),
+ ids.iterator(),
+ files.iterator(),
+ statuses(ManifestEntry.Status.EXISTING,
ManifestEntry.Status.EXISTING));
+ }
+
+ @Test
+ public void testAppendDuringRewriteManifest() {
+ Table table = load();
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+ long appendIdA = table.currentSnapshot().snapshotId();
+
+ // start the rewrite manifests
+ RewriteManifests rewrite = table.rewriteManifests();
+ rewrite.clusterBy(file -> "file").apply();
+
+ // append a file
+ table.newFastAppend()
+ .appendFile(FILE_B)
+ .commit();
+ long appendIdB = table.currentSnapshot().snapshotId();
+
+ Assert.assertEquals(2, table.currentSnapshot().manifests().size());
+
+ // commit the rewrite manifests in progress
+ rewrite.commit();
+
+ // the rewrite should only affect the first manifest, so we will end up
with 2 manifests even though we
+ // have a single cluster key, rewritten one should be the first in the list
+
+ List<ManifestFile> manifests = table.currentSnapshot().manifests();
+ Assert.assertEquals(2, manifests.size());
+
+ validateManifestEntries(manifests.get(0),
+ ids(appendIdA),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+ validateManifestEntries(manifests.get(1),
+ ids(appendIdB),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.ADDED));
+ }
+
+ @Test
+ public void testRewriteManifestDuringAppend() {
+ Table table = load();
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+ long appendIdA = table.currentSnapshot().snapshotId();
+
+ // start an append
+ AppendFiles append = table.newFastAppend();
+ append.appendFile(FILE_B).apply();
+
+ // rewrite the manifests - only affects the first
+ table.rewriteManifests()
+ .clusterBy(file -> "file")
+ .commit();
+
+ Assert.assertEquals(1, table.currentSnapshot().manifests().size());
+
+ // commit the append in progress
+ append.commit();
+ long appendIdB = table.currentSnapshot().snapshotId();
+
+ List<ManifestFile> manifests = table.currentSnapshot().manifests();
+ Assert.assertEquals(2, manifests.size());
+
+ // last append should be the first in the list
+
+ validateManifestEntries(manifests.get(0),
+ ids(appendIdB),
+ files(FILE_B),
+ statuses(ManifestEntry.Status.ADDED));
+ validateManifestEntries(manifests.get(1),
+ ids(appendIdA),
+ files(FILE_A),
+ statuses(ManifestEntry.Status.EXISTING));
+ }
+}