This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f2be7c8c8b [core][rest] Add tableId to commit snapshot to avoid wrong
commit (#5679)
f2be7c8c8b is described below
commit f2be7c8c8be476abd9fa1a6ab0db8d1abbe1d841
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri May 30 18:17:51 2025 +0800
[core][rest] Add tableId to commit snapshot to avoid wrong commit (#5679)
---
docs/static/rest-catalog-open-api.yaml | 2 ++
.../main/java/org/apache/paimon/rest/RESTApi.java | 8 ++++++--
.../paimon/rest/requests/CommitTableRequest.java | 11 +++++++++++
.../org/apache/paimon/catalog/AbstractCatalog.java | 5 ++++-
.../java/org/apache/paimon/catalog/Catalog.java | 6 +++++-
.../paimon/catalog/CatalogSnapshotCommit.java | 14 ++++++++++----
.../org/apache/paimon/catalog/DelegateCatalog.java | 7 +++++--
.../java/org/apache/paimon/rest/RESTCatalog.java | 9 +++++----
.../apache/paimon/table/CatalogEnvironment.java | 2 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 22 +++++++++++++++-------
.../org/apache/paimon/rest/RESTCatalogTest.java | 12 ++++++++++++
11 files changed, 76 insertions(+), 22 deletions(-)
diff --git a/docs/static/rest-catalog-open-api.yaml
b/docs/static/rest-catalog-open-api.yaml
index 384bb6fe7a..f48ef9cb14 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -2395,6 +2395,8 @@ components:
CommitTableRequest:
type: object
properties:
+ tableId:
+ type: string
snapshot:
$ref: '#/components/schemas/Snapshot'
statistics:
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index 9fb78cd76b..4c2f41baa5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -544,6 +544,7 @@ public class RESTApi {
* Commit snapshot for table.
*
* @param identifier database name and table name.
+ * @param tableUuid Uuid of the table to avoid wrong commit
* @param snapshot snapshot for committing
* @param statistics statistics for this snapshot incremental
* @return true if commit success
@@ -552,8 +553,11 @@ public class RESTApi {
* this table
*/
public boolean commitSnapshot(
- Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics) {
- CommitTableRequest request = new CommitTableRequest(snapshot,
statistics);
+ Identifier identifier,
+ @Nullable String tableUuid,
+ Snapshot snapshot,
+ List<PartitionStatistics> statistics) {
+ CommitTableRequest request = new CommitTableRequest(tableUuid,
snapshot, statistics);
CommitTableResponse response =
client.post(
resourcePaths.commitTable(
diff --git
a/paimon-api/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
index 504af0b0fa..18bb1bf999 100644
---
a/paimon-api/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
+++
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
@@ -33,9 +33,13 @@ import java.util.List;
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommitTableRequest implements RESTRequest {
+ private static final String FIELD_TABLE_ID = "tableId";
private static final String FIELD_SNAPSHOT = "snapshot";
private static final String FIELD_STATISTICS = "statistics";
+ @JsonProperty(FIELD_TABLE_ID)
+ private final String tableId;
+
@JsonProperty(FIELD_SNAPSHOT)
private final Snapshot snapshot;
@@ -44,12 +48,19 @@ public class CommitTableRequest implements RESTRequest {
@JsonCreator
public CommitTableRequest(
+ @JsonProperty(FIELD_TABLE_ID) String tableId,
@JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot,
@JsonProperty(FIELD_STATISTICS) List<PartitionStatistics>
statistics) {
+ this.tableId = tableId;
this.snapshot = snapshot;
this.statistics = statistics;
}
+ @JsonGetter(FIELD_TABLE_ID)
+ public String getTableId() {
+ return tableId;
+ }
+
@JsonGetter(FIELD_SNAPSHOT)
public Snapshot getSnapshot() {
return snapshot;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index eca8c68675..9b6e0a5c55 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -477,7 +477,10 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public boolean commitSnapshot(
- Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics) {
+ Identifier identifier,
+ @Nullable String tableUuid,
+ Snapshot snapshot,
+ List<PartitionStatistics> statistics) {
throw new UnsupportedOperationException();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 4e5fd10046..af9f10b782 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -617,6 +617,7 @@ public interface Catalog extends AutoCloseable {
* Commit the {@link Snapshot} for table identified by the given {@link
Identifier}.
*
* @param identifier Path of the table
+ * @param tableUuid Uuid of the table to avoid wrong commit
* @param snapshot Snapshot to be committed
* @param statistics statistics information of this change
* @return Success or not
@@ -625,7 +626,10 @@ public interface Catalog extends AutoCloseable {
* #supportsVersionManagement()}
*/
boolean commitSnapshot(
- Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics)
+ Identifier identifier,
+ @Nullable String tableUuid,
+ Snapshot snapshot,
+ List<PartitionStatistics> statistics)
throws Catalog.TableNotExistException;
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
index 62e5b58ac5..c7d61a2f58 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
@@ -22,6 +22,8 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.util.List;
/** A {@link SnapshotCommit} using {@link Catalog} to commit. */
@@ -29,10 +31,12 @@ public class CatalogSnapshotCommit implements
SnapshotCommit {
private final Catalog catalog;
private final Identifier identifier;
+ @Nullable private final String uuid;
- public CatalogSnapshotCommit(Catalog catalog, Identifier identifier) {
+ public CatalogSnapshotCommit(Catalog catalog, Identifier identifier,
@Nullable String uuid) {
this.catalog = catalog;
this.identifier = identifier;
+ this.uuid = uuid;
}
@Override
@@ -40,7 +44,7 @@ public class CatalogSnapshotCommit implements SnapshotCommit {
throws Exception {
Identifier newIdentifier =
new Identifier(identifier.getDatabaseName(),
identifier.getTableName(), branch);
- return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
+ return catalog.commitSnapshot(newIdentifier, uuid, snapshot,
statistics);
}
@Override
@@ -54,14 +58,16 @@ public class CatalogSnapshotCommit implements
SnapshotCommit {
private static final long serialVersionUID = 1L;
private final CatalogLoader catalogLoader;
+ @Nullable private final String uuid;
- public Factory(CatalogLoader catalogLoader) {
+ public Factory(CatalogLoader catalogLoader, @Nullable String uuid) {
this.catalogLoader = catalogLoader;
+ this.uuid = uuid;
}
@Override
public SnapshotCommit create(Identifier identifier, SnapshotManager
snapshotManager) {
- return new CatalogSnapshotCommit(catalogLoader.load(), identifier);
+ return new CatalogSnapshotCommit(catalogLoader.load(), identifier,
uuid);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 96470c9b9b..119af609bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -212,9 +212,12 @@ public abstract class DelegateCatalog implements Catalog {
@Override
public boolean commitSnapshot(
- Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics)
+ Identifier identifier,
+ @Nullable String tableUuid,
+ Snapshot snapshot,
+ List<PartitionStatistics> statistics)
throws TableNotExistException {
- return wrapped.commitSnapshot(identifier, snapshot, statistics);
+ return wrapped.commitSnapshot(identifier, tableUuid, snapshot,
statistics);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 72e4373efb..ea81506200 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -39,7 +39,6 @@ import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
-import org.apache.paimon.rest.requests.RollbackTableRequest;
import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetFunctionResponse;
@@ -331,10 +330,13 @@ public class RESTCatalog implements Catalog {
@Override
public boolean commitSnapshot(
- Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics)
+ Identifier identifier,
+ @Nullable String tableUuid,
+ Snapshot snapshot,
+ List<PartitionStatistics> statistics)
throws TableNotExistException {
try {
- return api.commitSnapshot(identifier, snapshot, statistics);
+ return api.commitSnapshot(identifier, tableUuid, snapshot,
statistics);
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
@@ -347,7 +349,6 @@ public class RESTCatalog implements Catalog {
@Override
public void rollbackTo(Identifier identifier, Instant instant)
throws Catalog.TableNotExistException {
- RollbackTableRequest request = new RollbackTableRequest(instant);
try {
api.rollbackTo(identifier, instant);
} catch (NoSuchResourceException e) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index 3ea579e120..aeb9575bf1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -95,7 +95,7 @@ public class CatalogEnvironment implements Serializable {
public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) {
SnapshotCommit.Factory factory;
if (catalogLoader != null && supportsVersionManagement) {
- factory = new CatalogSnapshotCommit.Factory(catalogLoader);
+ factory = new CatalogSnapshotCommit.Factory(catalogLoader, uuid);
} else {
factory = new RenamingSnapshotCommit.Factory(lockFactory,
lockContext);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index d183184850..672426f7d1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -773,10 +773,11 @@ public class RESTCatalogServer {
if (!tableMetadataStore.containsKey(identifier.getFullName())) {
throw new Catalog.TableNotExistException(identifier);
}
- boolean success =
- commitSnapshot(identifier, requestBody.getSnapshot(),
requestBody.getStatistics());
- CommitTableResponse response = new CommitTableResponse(success);
- return mockResponse(response, 200);
+ return commitSnapshot(
+ identifier,
+ requestBody.getTableId(),
+ requestBody.getSnapshot(),
+ requestBody.getStatistics());
}
private MockResponse rollbackTableByIdHandle(Identifier identifier, long
snapshotId)
@@ -1864,10 +1865,16 @@ public class RESTCatalogServer {
return String.format("%s-%d", identifier.getFullName(), snapshotId);
}
- private boolean commitSnapshot(
- Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics)
+ private MockResponse commitSnapshot(
+ Identifier identifier,
+ String tableId,
+ Snapshot snapshot,
+ List<PartitionStatistics> statistics)
throws Catalog.TableNotExistException {
FileStoreTable table = getFileTable(identifier);
+ if (!tableId.equals(table.catalogEnvironment().uuid())) {
+ throw new Catalog.TableNotExistException(identifier);
+ }
RenamingSnapshotCommit commit =
new RenamingSnapshotCommit(table.snapshotManager(),
Lock.empty());
String branchName = identifier.getBranchName();
@@ -1989,7 +1996,8 @@ public class RESTCatalogServer {
&&
partition.recordCount() <= 0);
return partitions.isEmpty();
});
- return success;
+ CommitTableResponse response = new CommitTableResponse(success);
+ return mockResponse(response, 200);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 9953264233..ec2617a934 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -308,6 +308,7 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
() ->
restCatalog.commitSnapshot(
identifier,
+ "",
createSnapshotWithMillis(1L,
System.currentTimeMillis()),
new ArrayList<PartitionStatistics>()));
}
@@ -1263,10 +1264,21 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
() ->
restCatalog.commitSnapshot(
hasSnapshotTableIdentifier,
+ "",
createSnapshotWithMillis(1L,
System.currentTimeMillis()),
new ArrayList<>()));
createTable(hasSnapshotTableIdentifier, Maps.newHashMap(),
Lists.newArrayList("col1"));
+
+ assertThrows(
+ Catalog.TableNotExistException.class,
+ () ->
+ restCatalog.commitSnapshot(
+ hasSnapshotTableIdentifier,
+ "unknown_id",
+ createSnapshotWithMillis(1L,
System.currentTimeMillis()),
+ new ArrayList<>()));
+
long id = 10086;
long millis = System.currentTimeMillis();
updateSnapshotOnRestServer(