This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch release/1.0.x
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/release/1.0.x by this push:
new 9d1514b49 Feature: Rollback compaction on conflict (#1285)
9d1514b49 is described below
commit 9d1514b4906a6a66c1ca2e45aa4e72ff848a044c
Author: Prashant Singh <[email protected]>
AuthorDate: Fri Jun 20 07:59:34 2025 -0700
Feature: Rollback compaction on conflict (#1285)
Intention is make the catalog smarter, to revert the compaction commits in
case of crunch to let the writers who are actually adding or removing the data
to the table succeed. In a sense treating compaction as always a lower priority
process.
Presently the rest catalog client creates the snapshot and asks the Rest
Server to apply the snapshot and gives this in a combination of requirement and
update.
Polaris could apply some basic inference and generate some updates to
metadata given a property is enabled at a table level, by saying that It will
revert back the commit which was created by compaction and let the write
succeed.
I had this PR in OSS, which was essentially doing this at the client end,
but we think its best if we do this as server end. to support more such clients.
How to use this
Enable a catalog level configuration :
polaris.config.rollback.compaction.on-conflicts.enabled when this is enabled
polaris will apply the intelligence of rollbacking those REPLACE ops snapshot
which have the property of polaris.internal.rollback.compaction.on-conflict in
their snapshot summary to resolve conflicts at the server end !
a sample use case is there is a deployment of a Polaris where this config
is enabled and there is auto compaction (maintenance job) which is updating the
table state, it adds the snapshot summary that
polaris.internal.rollback.compaction.on-conflict is true now when a backfill
process running for 8 hours want to commit but can't because the compaction job
committed before so in this case it will reach out to Polaris and Polaris will
see if the snapshot of compation aka replace snapsho [...]
Devlist: https://lists.apache.org/thread/8k8t77dgk1vc124fnb61932bdp9kf1lc
---
.../polaris/core/config/FeatureConfiguration.java | 11 +
.../quarkus/catalog/IcebergCatalogTest.java | 359 +++++++++++++++++++++
.../catalog/iceberg/CatalogHandlerUtils.java | 289 +++++++++++++++--
3 files changed, 636 insertions(+), 23 deletions(-)
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index 1e0e96329..fe265c307 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -287,4 +287,15 @@ public class FeatureConfiguration<T> extends
PolarisConfiguration<T> {
+ "This should only be set to 'true' for tests!")
.defaultValue(false)
.buildFeatureConfiguration();
+
+ public static final FeatureConfiguration<Boolean>
ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS =
+ PolarisConfiguration.<Boolean>builder()
+ .key("ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS")
+
.catalogConfig("polaris.config.rollback.compaction.on-conflicts.enabled")
+ .description(
+ "Rollback replace snapshots created by compaction which have "
+ +
"polaris.internal.conflict-resolution.by-operation-type.replace property set to
rollback "
+ + "in their snapshot summary")
+ .defaultValue(false)
+ .buildFeatureConfiguration();
}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
index 2b4c4205c..becd3dcfe 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
@@ -19,6 +19,8 @@
package org.apache.polaris.service.quarkus.catalog;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doReturn;
@@ -29,6 +31,8 @@ import static org.mockito.Mockito.when;
import com.azure.core.exception.HttpResponseException;
import com.google.cloud.storage.StorageException;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
import io.quarkus.test.junit.QuarkusMock;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;
@@ -37,6 +41,7 @@ import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.SecurityContext;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.lang.reflect.Method;
import java.time.Clock;
import java.util.Arrays;
@@ -48,16 +53,29 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.UpdateRequirement;
+import org.apache.iceberg.UpdateRequirements;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
@@ -68,8 +86,11 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
@@ -111,6 +132,7 @@ import
org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
+import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils;
import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.ExceptionMappingFileIO;
@@ -135,7 +157,9 @@ import org.apache.polaris.service.task.TaskFileIOSupplier;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
import org.apache.polaris.service.types.TableUpdateNotification;
+import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.assertj.core.configuration.PreferredAssumptionException;
import org.junit.jupiter.api.AfterEach;
@@ -164,6 +188,15 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
PreferredAssumptionException.JUNIT5);
}
+ DeleteFile FILE_A_DELETES =
+ FileMetadata.deleteFileBuilder(SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/data-a-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("id_bucket=0") // easy way to set partition data
for now
+ .withRecordCount(1)
+ .build();
+
public static class Profile implements QuarkusTestProfile {
@Override
@@ -547,6 +580,332 @@ public abstract class IcebergCatalogTest extends
CatalogTests<IcebergCatalog> {
.hasMessageContaining("Parent");
}
+ @Test
+ public void testConcurrentWritesWithRollbackNonEmptyTable() {
+ IcebergCatalog catalog = this.catalog();
+ if (this.requiresNamespaceCreate()) {
+ catalog.createNamespace(NS);
+ }
+
+ Table table = catalog.buildTable(TABLE,
SCHEMA).withPartitionSpec(SPEC).create();
+ this.assertNoFiles(table);
+
+ // commit FILE_A
+ catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
+ this.assertFiles(catalog.loadTable(TABLE), FILE_A);
+ table.refresh();
+
+ long lastSnapshotId = table.currentSnapshot().snapshotId();
+
+ // Apply the deletes based on FILE_A
+ // this should conflict when we try to commit without the change.
+ RowDelta originalRowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .validateFromSnapshot(lastSnapshotId)
+ .validateDataFilesExist(List.of(FILE_A.location()));
+ // Make client ready with updates, don't reach out to IRC server yet
+ Snapshot s = originalRowDelta.apply();
+ TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
+ TableMetadata base = ops.current();
+ TableMetadata.Builder update = TableMetadata.buildFrom(base);
+ update.setBranchSnapshot(s, "main");
+ TableMetadata updatedMetadata = update.build();
+ List<MetadataUpdate> updates = updatedMetadata.changes();
+ List<UpdateRequirement> requirements =
UpdateRequirements.forUpdateTable(base, updates);
+ UpdateTableRequest request = UpdateTableRequest.create(TABLE,
requirements, updates);
+
+ // replace FILE_A with FILE_B
+ // set the snapshot property in the summary to make this snapshot
+ // rollback-able.
+ catalog
+ .loadTable(TABLE)
+ .newRewrite()
+ .addFile(FILE_B)
+ .deleteFile(FILE_A)
+ .set("polaris.internal.conflict-resolution.by-operation-type.replace",
"rollback")
+ .commit();
+
+ try {
+ // Now call IRC server to commit delete operation.
+ CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5,
true);
+ catalogHandlerUtils.commit(((BaseTable)
catalog.loadTable(TABLE)).operations(), request);
+ } catch (Exception e) {
+ fail("Rollback Compaction on conflict feature failed : " + e);
+ }
+
+ table.refresh();
+
+ // Assert only 2 snapshots and no snapshot of REPLACE left.
+ Snapshot currentSnapshot =
table.snapshot(table.refs().get("main").snapshotId());
+ int totalSnapshots = 1;
+ while (currentSnapshot.parentId() != null) {
+ // no snapshot in the hierarchy for REPLACE operations
+
assertThat(currentSnapshot.operation()).isNotEqualTo(DataOperations.REPLACE);
+ currentSnapshot = table.snapshot(currentSnapshot.parentId());
+ totalSnapshots += 1;
+ }
+ assertThat(totalSnapshots).isEqualTo(2);
+
+ // Inspect the files 1 DELETE file i.e. FILE_A_DELETES and 1 DATA FILE
FILE_A
+ try {
+ try (CloseableIterable<FileScanTask> tasks =
table.newScan().planFiles()) {
+ List<CharSequence> dataFilePaths =
+ Streams.stream(tasks)
+ .map(ContentScanTask::file)
+ .map(ContentFile::location)
+ .collect(Collectors.toList());
+ List<CharSequence> deleteFilePaths =
+ Streams.stream(tasks)
+ .flatMap(t -> t.deletes().stream().map(ContentFile::location))
+ .collect(Collectors.toList());
+ ((ListAssert)
+ Assertions.assertThat(dataFilePaths)
+ .as("Should contain expected number of data files", new
Object[0]))
+ .hasSize(1);
+ ((ListAssert)
+ Assertions.assertThat(deleteFilePaths)
+ .as("Should contain expected number of delete files", new
Object[0]))
+ .hasSize(1);
+ ((AbstractCollectionAssert)
+ Assertions.assertThat(CharSequenceSet.of(dataFilePaths))
+ .as("Should contain correct file paths", new Object[0]))
+ .isEqualTo(
+ CharSequenceSet.of(
+ Iterables.transform(Arrays.asList(FILE_A),
ContentFile::location)));
+ ((AbstractCollectionAssert)
+ Assertions.assertThat(CharSequenceSet.of(deleteFilePaths))
+ .as("Should contain correct file paths", new Object[0]))
+ .isEqualTo(
+ CharSequenceSet.of(
+ Iterables.transform(Arrays.asList(FILE_A_DELETES),
ContentFile::location)));
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Test
+ public void
testConcurrentWritesWithRollbackWithNonReplaceSnapshotInBetween() {
+ IcebergCatalog catalog = this.catalog();
+ if (this.requiresNamespaceCreate()) {
+ catalog.createNamespace(NS);
+ }
+
+ Table table = catalog.buildTable(TABLE,
SCHEMA).withPartitionSpec(SPEC).create();
+ this.assertNoFiles(table);
+
+ // commit FILE_A
+ catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
+ this.assertFiles(catalog.loadTable(TABLE), FILE_A);
+ table.refresh();
+
+ long lastSnapshotId = table.currentSnapshot().snapshotId();
+
+ // Apply the deletes based on FILE_A
+ // this should conflict when we try to commit without the change.
+ RowDelta originalRowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .validateFromSnapshot(lastSnapshotId)
+ .validateDataFilesExist(List.of(FILE_A.location()));
+ // Make client ready with updates, don't reach out to IRC server yet
+ Snapshot s = originalRowDelta.apply();
+ TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
+ TableMetadata base = ops.current();
+ TableMetadata.Builder update = TableMetadata.buildFrom(base);
+ update.setBranchSnapshot(s, "main");
+ TableMetadata updatedMetadata = update.build();
+ List<MetadataUpdate> updates = updatedMetadata.changes();
+ List<UpdateRequirement> requirements =
UpdateRequirements.forUpdateTable(base, updates);
+ UpdateTableRequest request = UpdateTableRequest.create(TABLE,
requirements, updates);
+
+ // replace FILE_A with FILE_B
+ // commit the transaction.
+ catalog
+ .loadTable(TABLE)
+ .newRewrite()
+ .addFile(FILE_B)
+ .deleteFile(FILE_A)
+ .set("polaris.internal.conflict-resolution.by-operation-type.replace",
"rollback")
+ .commit();
+
+ // commit FILE_C
+ catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).commit();
+ CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true);
+ Assertions.assertThatThrownBy(
+ () ->
+ catalogHandlerUtils.commit(
+ ((BaseTable) catalog.loadTable(TABLE)).operations(),
request))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Requirement failed: branch main has changed");
+
+ table.refresh();
+
+ // Assert only 3 snapshots
+ Snapshot currentSnapshot =
table.snapshot(table.refs().get("main").snapshotId());
+ int totalSnapshots = 1;
+ while (currentSnapshot.parentId() != null) {
+ currentSnapshot = table.snapshot(currentSnapshot.parentId());
+ totalSnapshots += 1;
+ }
+ assertThat(totalSnapshots).isEqualTo(3);
+ this.assertFiles(catalog.loadTable(TABLE), FILE_B, FILE_C);
+ }
+
+ @Test
+ public void
+
testConcurrentWritesWithRollbackEnableWithToRollbackSnapshotReferencedByOtherBranch()
{
+ IcebergCatalog catalog = this.catalog();
+ if (this.requiresNamespaceCreate()) {
+ catalog.createNamespace(NS);
+ }
+
+ Table table = catalog.buildTable(TABLE,
SCHEMA).withPartitionSpec(SPEC).create();
+ this.assertNoFiles(table);
+
+ // commit FILE_A
+ catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
+ this.assertFiles(catalog.loadTable(TABLE), FILE_A);
+ table.refresh();
+
+ long lastSnapshotId = table.currentSnapshot().snapshotId();
+
+ // Apply the deletes based on FILE_A
+ // this should conflict when we try to commit without the change.
+ RowDelta originalRowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .validateFromSnapshot(lastSnapshotId)
+ .validateDataFilesExist(List.of(FILE_A.location()));
+ // Make client ready with updates, don't reach out to IRC server yet
+ Snapshot s = originalRowDelta.apply();
+ TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
+ TableMetadata base = ops.current();
+ TableMetadata.Builder update = TableMetadata.buildFrom(base);
+ update.setBranchSnapshot(s, "main");
+ TableMetadata updatedMetadata = update.build();
+ List<MetadataUpdate> updates = updatedMetadata.changes();
+ List<UpdateRequirement> requirements =
UpdateRequirements.forUpdateTable(base, updates);
+ UpdateTableRequest request = UpdateTableRequest.create(TABLE,
requirements, updates);
+
+ // replace FILE_A with FILE_B
+ catalog
+ .loadTable(TABLE)
+ .newRewrite()
+ .addFile(FILE_B)
+ .deleteFile(FILE_A)
+ .set("polaris.internal.conflict-resolution.by-operation-type.replace",
"rollback")
+ .commit();
+
+ Table t = catalog.loadTable(TABLE);
+ // add another branch B
+ t.manageSnapshots()
+ .createBranch("non-main")
+ .setCurrentSnapshot(t.currentSnapshot().snapshotId())
+ .commit();
+ // now add more files to non-main branch
+
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).toBranch("non-main").commit();
+ CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true);
+ Assertions.assertThatThrownBy(
+ () ->
+ catalogHandlerUtils.commit(
+ ((BaseTable) catalog.loadTable(TABLE)).operations(),
request))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Requirement failed: branch main has changed");
+
+ table.refresh();
+
+ // Assert only 3 snapshots
+ Snapshot currentSnapshot =
table.snapshot(table.refs().get("main").snapshotId());
+ int totalSnapshots = 1;
+ while (currentSnapshot.parentId() != null) {
+ currentSnapshot = table.snapshot(currentSnapshot.parentId());
+ totalSnapshots += 1;
+ }
+ assertThat(totalSnapshots).isEqualTo(2);
+ this.assertFiles(catalog.loadTable(TABLE), FILE_B);
+ }
+
+ @Test
+ public void
testConcurrentWritesWithRollbackWithConcurrentWritesToDifferentBranches() {
+ IcebergCatalog catalog = this.catalog();
+ if (this.requiresNamespaceCreate()) {
+ catalog.createNamespace(NS);
+ }
+
+ Table table = catalog.buildTable(TABLE,
SCHEMA).withPartitionSpec(SPEC).create();
+ this.assertNoFiles(table);
+
+ // commit FILE_A to main branch
+ catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
+ this.assertFiles(catalog.loadTable(TABLE), FILE_A);
+ table.refresh();
+
+ Table t = catalog.loadTable(TABLE);
+ // add another branch B
+ t.manageSnapshots()
+ .createBranch("non-main")
+ .setCurrentSnapshot(t.currentSnapshot().snapshotId())
+ .commit();
+
+ long lastSnapshotId = table.currentSnapshot().snapshotId();
+
+ // Apply the deletes based on FILE_A
+ // this should conflict when we try to commit without the change.
+ RowDelta originalRowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .validateFromSnapshot(lastSnapshotId)
+ .validateDataFilesExist(List.of(FILE_A.location()));
+ // Make client ready with updates, don't reach out to IRC server yet
+ Snapshot s = originalRowDelta.apply();
+ TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
+ TableMetadata base = ops.current();
+ TableMetadata.Builder update = TableMetadata.buildFrom(base);
+ update.setBranchSnapshot(s, "main");
+ TableMetadata updatedMetadata = update.build();
+ List<MetadataUpdate> updates = updatedMetadata.changes();
+ List<UpdateRequirement> requirements =
UpdateRequirements.forUpdateTable(base, updates);
+ UpdateTableRequest request = UpdateTableRequest.create(TABLE,
requirements, updates);
+
+ // replace FILE_A with FILE_B on main branch
+ catalog
+ .loadTable(TABLE)
+ .newRewrite()
+ .addFile(FILE_B)
+ .deleteFile(FILE_A)
+ .set("polaris.internal.conflict-resolution.by-operation-type.replace",
"rollback")
+ .commit();
+
+ // now add more files to non-main branch, this will make sequence number
non monotonic for main
+ // branch
+
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).toBranch("non-main").commit();
+ CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true);
+ Assertions.assertThatThrownBy(
+ () ->
+ catalogHandlerUtils.commit(
+ ((BaseTable) catalog.loadTable(TABLE)).operations(),
request))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Requirement failed: branch main has changed");
+
+ table.refresh();
+
+ // Assert only 3 snapshots
+ Snapshot currentSnapshot =
table.snapshot(table.refs().get("main").snapshotId());
+ int totalSnapshots = 1;
+ while (currentSnapshot.parentId() != null) {
+ currentSnapshot = table.snapshot(currentSnapshot.parentId());
+ totalSnapshots += 1;
+ }
+ assertThat(totalSnapshots).isEqualTo(2);
+ this.assertFiles(catalog.loadTable(TABLE), FILE_B);
+ }
+
@Test
public void testValidateNotificationWhenTableAndNamespacesDontExist() {
Assumptions.assumeTrue(
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
index aa99d53f5..ae879ea5f 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
@@ -22,16 +22,21 @@ import static
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAUL
import static
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
+import java.lang.reflect.Field;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,9 +44,13 @@ import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BaseTransaction;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
@@ -74,6 +83,8 @@ import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.LoadViewResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.view.BaseView;
import org.apache.iceberg.view.SQLViewRepresentation;
@@ -85,6 +96,8 @@ import org.apache.iceberg.view.ViewRepresentation;
import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.context.RealmContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* CODE_COPIED_TO_POLARIS Copied from CatalogHandler in Iceberg 1.8.0 Contains
a collection of
@@ -92,17 +105,41 @@ import org.apache.polaris.core.context.RealmContext;
*/
@ApplicationScoped
public class CatalogHandlerUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CatalogHandlerUtils.class);
+
private static final Schema EMPTY_SCHEMA = new Schema();
private static final String INITIAL_PAGE_TOKEN = "";
+ private static final String CONFLICT_RESOLUTION_ACTION =
+ "polaris.internal.conflict-resolution.by-operation-type.replace";
+ private static final Field LAST_SEQUENCE_NUMBER_FIELD;
- private final RealmContext realmContext;
- private final PolarisConfigurationStore configurationStore;
+ static {
+ try {
+ LAST_SEQUENCE_NUMBER_FIELD =
+ TableMetadata.Builder.class.getDeclaredField("lastSequenceNumber");
+ LAST_SEQUENCE_NUMBER_FIELD.setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Unable to access field", e);
+ }
+ }
+
+ private final int maxCommitRetries;
+ private final boolean rollbackCompactionEnabled;
@Inject
public CatalogHandlerUtils(
RealmContext realmContext, PolarisConfigurationStore configurationStore)
{
- this.realmContext = realmContext;
- this.configurationStore = configurationStore;
+ this(
+ configurationStore.getConfiguration(
+ realmContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES),
+ configurationStore.getConfiguration(
+ realmContext,
FeatureConfiguration.ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS));
+ }
+
+ @VisibleForTesting
+ public CatalogHandlerUtils(int maxCommitRetries, boolean
rollbackCompactionEnabled) {
+ this.maxCommitRetries = maxCommitRetries;
+ this.rollbackCompactionEnabled = rollbackCompactionEnabled;
}
/**
@@ -421,11 +458,12 @@ public class CatalogHandlerUtils {
return ops.current();
}
- protected TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
+ @VisibleForTesting
+ public TableMetadata commit(TableOperations ops, UpdateTableRequest request)
{
AtomicBoolean isRetry = new AtomicBoolean(false);
try {
Tasks.foreach(ops)
- .retry(maxCommitRetries())
+ .retry(maxCommitRetries)
.exponentialBackoff(
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
@@ -435,30 +473,106 @@ public class CatalogHandlerUtils {
.run(
taskOps -> {
TableMetadata base = isRetry.get() ? taskOps.refresh() :
taskOps.current();
- isRetry.set(true);
- // validate requirements
+ TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(base);
+ TableMetadata newBase = base;
try {
- request.requirements().forEach(requirement ->
requirement.validate(base));
+ request.requirements().forEach((requirement) ->
requirement.validate(base));
+ } catch (CommitFailedException e) {
+ if (!rollbackCompactionEnabled) {
+ // wrap and rethrow outside of tasks to avoid unnecessary
retry
+ throw new ValidationFailureException(e);
+ }
+ LOGGER.debug(
+ "Attempting to Rollback replace operations for table={},
with current-snapshot-id={}",
+ base.uuid(),
+ base.currentSnapshot().snapshotId());
+ UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+ findAssertRefSnapshotID(request);
+ MetadataUpdate.SetSnapshotRef setSnapshotRef =
findSetSnapshotRefUpdate(request);
+
+ if (assertRefSnapshotId == null || setSnapshotRef == null) {
+ // This implies the request was not trying to add a
snapshot.
+ LOGGER.debug(
+ "Giving up on Rollback replace operations for
table={}, with current-snapshot-id={}, as operation doesn't attempts to add a
single snapshot",
+ base.uuid(),
+ base.currentSnapshot().snapshotId());
+ // wrap and rethrow outside of tasks to avoid unnecessary
retry
+ throw new ValidationFailureException(e);
+ }
+
+ // snapshot-id the client expects the table
current_snapshot_id
+ long expectedCurrentSnapshotId =
assertRefSnapshotId.snapshotId();
+
+ MetadataUpdate.AddSnapshot snapshotToBeAdded =
findAddSnapshotUpdate(request);
+ if (snapshotToBeAdded == null) {
+ // Re-throw if, there's no snapshot data to be added.
+ // wrap and rethrow outside of tasks to avoid unnecessary
retry
+ throw new ValidationFailureException(e);
+ }
+
+ LOGGER.info(
+ "Attempting to Rollback replace operation for table={},
with current-snapshot-id={}, to snapshot={}",
+ base.uuid(),
+ base.currentSnapshot().snapshotId(),
+ snapshotToBeAdded.snapshot().snapshotId());
+
+ List<MetadataUpdate> metadataUpdates =
+ generateUpdatesToRemoveNoopSnapshot(
+ base, expectedCurrentSnapshotId,
setSnapshotRef.name());
+
+ if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+ // Nothing can be done as this implies that there were not
all
+ // No-op snapshots (REPLACE) between
expectedCurrentSnapshotId and
+ // currentSnapshotId. hence re-throw the exception caught.
+ // wrap and rethrow outside of tasks to avoid unnecessary
retry
+ throw new ValidationFailureException(e);
+ }
+ // Set back the ref we wanted to set, back to the snapshot-id
+ // the client is expecting the table to be at.
+ metadataBuilder.setBranchSnapshot(
+ expectedCurrentSnapshotId, setSnapshotRef.name());
+
+ // apply the remove snapshots update in the current metadata.
+ // NOTE: we need to setRef to expectedCurrentSnapshotId
first and then apply
+ // remove, as otherwise the remove will drop the reference.
+ // NOTE: we can skip removing the now orphan base. It's not
a hard requirement.
+ // just something good to do, and not leave for Remove
Orphans.
+ // Ref rolled back update correctly to snapshot to be
committed parent now.
+ metadataUpdates.forEach((update ->
update.applyTo(metadataBuilder)));
+ newBase =
+ setAppropriateLastSeqNumber(
+ metadataBuilder,
+ base.uuid(),
+ base.lastSequenceNumber(),
+
base.snapshot(expectedCurrentSnapshotId).sequenceNumber())
+ .build();
+ LOGGER.info(
+ "Successfully roll-backed replace operation for
table={}, with current-snapshot-id={}, to snapshot={}",
+ base.uuid(),
+ base.currentSnapshot().snapshotId(),
+ newBase.currentSnapshot().snapshotId());
+ }
+ // double check if the requirements passes now.
+ try {
+ TableMetadata baseWithRemovedSnaps = newBase;
+ request
+ .requirements()
+ .forEach((requirement) ->
requirement.validate(baseWithRemovedSnaps));
} catch (CommitFailedException e) {
// wrap and rethrow outside of tasks to avoid unnecessary
retry
throw new ValidationFailureException(e);
}
- // apply changes
- TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(base);
- request.updates().forEach(update ->
update.applyTo(metadataBuilder));
-
- TableMetadata updated = metadataBuilder.build();
+ TableMetadata.Builder newMetadataBuilder =
TableMetadata.buildFrom(newBase);
+ request.updates().forEach((update) ->
update.applyTo(newMetadataBuilder));
+ TableMetadata updated = newMetadataBuilder.build();
if (updated.changes().isEmpty()) {
// do not commit if the metadata has not changed
return;
}
-
- // commit
taskOps.commit(base, updated);
});
-
} catch (ValidationFailureException e) {
throw e.wrapped();
}
@@ -466,6 +580,140 @@ public class CatalogHandlerUtils {
return ops.current();
}
+ private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID(
+ UpdateTableRequest request) {
+ UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null;
+ int total = 0;
+ for (UpdateRequirement requirement : request.requirements()) {
+ if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
+ ++total;
+ assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID)
requirement;
+ }
+ }
+
+ // if > 1 assertion for refs, then it's not safe to roll back, make this
Noop.
+ return total != 1 ? null : assertRefSnapshotID;
+ }
+
+ private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
+ TableMetadata base, long expectedCurrentSnapshotId, String
updateRefName) {
+ // find the all the snapshots we want to retain which are not the part of
current branch.
+ Set<Long> idsToRetain = Sets.newHashSet();
+ for (Map.Entry<String, SnapshotRef> ref : base.refs().entrySet()) {
+ String refName = ref.getKey();
+ SnapshotRef snapshotRef = ref.getValue();
+ if (refName.equals(updateRefName)) {
+ continue;
+ }
+ idsToRetain.add(ref.getValue().snapshotId());
+ // Always check the ancestry for both branch and tags
+ // mostly for case where a branch was created and then was dropped
+ // then a tag was created and then rollback happened post that tag
+ // was dropped and branch was re-created on it.
+ for (Snapshot ancestor :
SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), base::snapshot)) {
+ idsToRetain.add(ancestor.snapshotId());
+ }
+ }
+
+ List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
+ Long snapshotId = base.ref(updateRefName).snapshotId(); // current tip of
the given branch
+ // ensure this branch has the latest sequence number.
+ long expectedSequenceNumber = base.lastSequenceNumber();
+ // Unexpected state as table's current sequence number is not equal to the
+ // most recent snapshot the ref points to.
+ if (expectedSequenceNumber != base.snapshot(snapshotId).sequenceNumber()) {
+ LOGGER.debug(
+ "Giving up rolling back table {} to snapshot {}, ref current
snapshot sequence number {} is not equal expected sequence number {}",
+ base.uuid(),
+ snapshotId,
+ base.snapshot(snapshotId).sequenceNumber(),
+ expectedSequenceNumber);
+ return null;
+ }
+ Set<Long> snapshotsToRemove = new LinkedHashSet<>();
+ while (snapshotId != null && !Objects.equals(snapshotId,
expectedCurrentSnapshotId)) {
+ Snapshot snap = base.snapshot(snapshotId);
+ if (!isRollbackSnapshot(snap) || idsToRetain.contains(snapshotId)) {
+ // Either encountered a non no-op snapshot or the snapshot is being
referenced by any other
+ // reference either by branch or a tag.
+ LOGGER.debug(
+ "Giving up rolling back table {} to snapshot {}, snapshot to be
removed referenced by another branch or tag ancestor",
+ base.uuid(),
+ snapshotId);
+ break;
+ }
+ snapshotsToRemove.add(snap.snapshotId());
+ snapshotId = snap.parentId();
+ }
+
+ boolean wasExpectedSnapshotReached = Objects.equals(snapshotId,
expectedCurrentSnapshotId);
+ updateToRemoveSnapshot.add(new
MetadataUpdate.RemoveSnapshots(snapshotsToRemove));
+ return wasExpectedSnapshotReached ? updateToRemoveSnapshot : null;
+ }
+
+ private boolean isRollbackSnapshot(Snapshot snapshot) {
+ // Only Snapshots with {@ROLLBACKABLE_REPLACE_SNAPSHOT} are allowed to be
rollback.
+ return DataOperations.REPLACE.equals(snapshot.operation())
+ && PropertyUtil.propertyAsString(snapshot.summary(),
CONFLICT_RESOLUTION_ACTION, "")
+ .equalsIgnoreCase("rollback");
+ }
+
+ private MetadataUpdate.SetSnapshotRef
findSetSnapshotRefUpdate(UpdateTableRequest request) {
+ int total = 0;
+ MetadataUpdate.SetSnapshotRef setSnapshotRefUpdate = null;
+ // find the SetRefName snapshot update
+ for (MetadataUpdate update : request.updates()) {
+ if (update instanceof MetadataUpdate.SetSnapshotRef) {
+ total++;
+ setSnapshotRefUpdate = (MetadataUpdate.SetSnapshotRef) update;
+ }
+ }
+
+ // if > 1 assertion for refs, then it's not safe to rollback, make this
Noop.
+ return total != 1 ? null : setSnapshotRefUpdate;
+ }
+
+ private MetadataUpdate.AddSnapshot findAddSnapshotUpdate(UpdateTableRequest
request) {
+ int total = 0;
+ MetadataUpdate.AddSnapshot addSnapshot = null;
+ // find the SetRefName snapshot update
+ for (MetadataUpdate update : request.updates()) {
+ if (update instanceof MetadataUpdate.AddSnapshot) {
+ total++;
+ addSnapshot = (MetadataUpdate.AddSnapshot) update;
+ }
+ }
+
+ // if > 1 assertion for addSnapshot, then it's not safe to rollback, make
this Noop.
+ return total != 1 ? null : addSnapshot;
+ }
+
+ private TableMetadata.Builder setAppropriateLastSeqNumber(
+ TableMetadata.Builder metadataBuilder,
+ String tableUUID,
+ long currentSequenceNumber,
+ long expectedSequenceNumber) {
+ // TODO: Get rid of the reflection call once TableMetadata have API for it.
+ // move the lastSequenceNumber back, to apply snapshot properly on the
+ // current-metadata Seq number are considered increasing monotonically
+ // snapshot over snapshot, the client generates the manifest list and hence
+ // the sequence number can't be changed for a snapshot the only possible
option
+ // then is to change the sequenceNumber tracked by metadata.json
+ try {
+ // this should point to the sequence number that current tip of the
+ // branch belongs to, as the new commit will be applied on top of this.
+ LAST_SEQUENCE_NUMBER_FIELD.set(metadataBuilder, expectedSequenceNumber);
+ LOGGER.info(
+ "Setting table uuid:{} last sequence number from:{} to {}",
+ tableUUID,
+ currentSequenceNumber,
+ expectedSequenceNumber);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+ return metadataBuilder;
+ }
+
private BaseView asBaseView(View view) {
Preconditions.checkState(
view instanceof BaseView, "Cannot wrap catalog that does not produce
BaseView");
@@ -565,7 +813,7 @@ public class CatalogHandlerUtils {
AtomicBoolean isRetry = new AtomicBoolean(false);
try {
Tasks.foreach(ops)
- .retry(maxCommitRetries())
+ .retry(maxCommitRetries)
.exponentialBackoff(
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
@@ -606,9 +854,4 @@ public class CatalogHandlerUtils {
return ops.current();
}
-
- private int maxCommitRetries() {
- return configurationStore.getConfiguration(
- realmContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES);
- }
}