This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a7486b0dbc [rest] Introduce snapshot loading to REST Catalog (#5147)
a7486b0dbc is described below
commit a7486b0dbc93e6ecd515e728569dff3428dbbef5
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Feb 25 15:41:47 2025 +0800
[rest] Introduce snapshot loading to REST Catalog (#5147)
---
.../java/org/apache/paimon/AbstractFileStore.java | 7 ++-
.../org/apache/paimon/catalog/CatalogUtils.java | 6 ++-
.../SupportsSnapshots.java} | 23 ++++++---
.../java/org/apache/paimon/rest/RESTCatalog.java | 48 +++++++++++++++---
.../org/apache/paimon/rest/RESTTokenFileIO.java | 7 ++-
.../java/org/apache/paimon/rest/ResourcePaths.java | 4 ++
.../rest/responses/ErrorResponseResourceType.java | 3 +-
.../rest/responses/GetTableSnapshotResponse.java | 47 ++++++++++++++++++
.../org/apache/paimon/schema/SchemaManager.java | 3 +-
.../apache/paimon/table/CatalogEnvironment.java | 17 ++++++-
.../org/apache/paimon/tag/SnapshotLoaderImpl.java | 57 ++++++++++++++++++++++
.../SnapshotLoader.java} | 20 +++++---
.../org/apache/paimon/utils/SnapshotManager.java | 30 +++++++-----
.../test/java/org/apache/paimon/SnapshotTest.java | 17 ++++++-
.../append/AppendOnlyTableCompactionTest.java | 3 +-
.../paimon/operation/PartitionExpireTest.java | 2 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 24 +++++++++
.../org/apache/paimon/rest/RESTCatalogTest.java | 22 +++++++++
.../paimon/table/FileStoreTableTestBase.java | 9 ++--
.../table/IncrementalTimeStampTableTest.java | 9 ++--
.../apache/paimon/table/system/FilesTableTest.java | 3 +-
.../paimon/table/system/ManifestsTableTest.java | 3 +-
.../paimon/table/system/SnapshotsTableTest.java | 3 +-
.../apache/paimon/utils/SnapshotManagerTest.java | 18 ++++---
.../paimon/flink/ContinuousFileStoreITCase.java | 3 +-
.../org/apache/paimon/flink/CatalogITCaseBase.java | 6 ++-
.../paimon/flink/ContinuousFileStoreITCase.java | 3 +-
.../apache/paimon/flink/RescaleBucketITCase.java | 3 +-
.../paimon/flink/sink/CommitterOperatorTest.java | 3 +-
.../paimon/flink/sink/StoreMultiCommitterTest.java | 6 +--
.../flink/source/TestChangelogDataReadWrite.java | 3 +-
paimon-open-api/rest-catalog-open-api.yaml | 42 ++++++++++++++++
.../paimon/open/api/RESTCatalogController.java | 26 ++++++++++
33 files changed, 410 insertions(+), 70 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index b3398035b5..b06c542629 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -168,7 +168,12 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
@Override
public SnapshotManager snapshotManager() {
- return new SnapshotManager(fileIO, options.path(), options.branch(),
snapshotCache);
+ return new SnapshotManager(
+ fileIO,
+ options.path(),
+ options.branch(),
+ catalogEnvironment.snapshotLoader(),
+ snapshotCache);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 14b49521bf..de7ec83755 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -193,7 +193,11 @@ public class CatalogUtils {
CatalogEnvironment catalogEnv =
new CatalogEnvironment(
- identifier, metadata.uuid(), catalog.catalogLoader(),
commitFactory);
+ identifier,
+ metadata.uuid(),
+ catalog.catalogLoader(),
+ commitFactory,
+ catalog instanceof SupportsSnapshots);
Path path = new Path(schema.options().get(PATH.key()));
FileStoreTable table =
FileStoreTableFactory.create(dataFileIO.apply(path), path,
schema, catalogEnv);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/SupportsSnapshots.java
similarity index 58%
copy from
paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
copy to
paimon-core/src/main/java/org/apache/paimon/catalog/SupportsSnapshots.java
index 5dc6cffade..1df80c131d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/SupportsSnapshots.java
@@ -16,12 +16,21 @@
* limitations under the License.
*/
-package org.apache.paimon.rest.responses;
+package org.apache.paimon.catalog;
-/** The type of resource that caused the error. */
-public enum ErrorResponseResourceType {
- DATABASE,
- TABLE,
- COLUMN,
- VIEW
+import org.apache.paimon.Snapshot;
+
+import java.util.Optional;
+
+/** A {@link Catalog} supports loading table snapshots. */
+public interface SupportsSnapshots {
+
+ /**
+ * Return the snapshot of table identified by the given {@link Identifier}.
+ *
+ * @param identifier Path of the table
+ * @return The requested snapshot of the table
+ * @throws Catalog.TableNotExistException if the target does not exist
+ */
+ Optional<Snapshot> loadSnapshot(Identifier identifier) throws
Catalog.TableNotExistException;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index fedaa7251d..341379e523 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.SupportsSnapshots;
import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -58,6 +59,7 @@ import
org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
@@ -83,6 +85,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
@@ -100,7 +103,7 @@ import static
org.apache.paimon.rest.auth.AuthSession.createAuthSession;
import static
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
/** A catalog implementation for REST. */
-public class RESTCatalog implements Catalog {
+public class RESTCatalog implements Catalog, SupportsSnapshots {
public static final String HEADER_PREFIX = "header.";
@@ -297,11 +300,44 @@ public class RESTCatalog implements Catalog {
}
}
- protected GetTableTokenResponse loadTableToken(Identifier identifier) {
- return client.get(
- resourcePaths.tableToken(identifier.getDatabaseName(),
identifier.getObjectName()),
- GetTableTokenResponse.class,
- restAuthFunction);
+ protected GetTableTokenResponse loadTableToken(Identifier identifier)
+ throws TableNotExistException {
+ GetTableTokenResponse response;
+ try {
+ response =
+ client.get(
+ resourcePaths.tableToken(
+ identifier.getDatabaseName(),
identifier.getObjectName()),
+ GetTableTokenResponse.class,
+ restAuthFunction);
+ } catch (NoSuchResourceException e) {
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+ return response;
+ }
+
+ @Override
+ public Optional<Snapshot> loadSnapshot(Identifier identifier) throws
TableNotExistException {
+ GetTableSnapshotResponse response;
+ try {
+ response =
+ client.get(
+ resourcePaths.tableSnapshot(
+ identifier.getDatabaseName(),
identifier.getObjectName()),
+ GetTableSnapshotResponse.class,
+ restAuthFunction);
+ } catch (NoSuchResourceException e) {
+ if (e.resourceType() == ErrorResponseResourceType.SNAPSHOT) {
+ return Optional.empty();
+ }
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+
+ return Optional.of(response.getSnapshot());
}
public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
index 42dddf3411..dfb4c6546d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
@@ -18,6 +18,7 @@
package org.apache.paimon.rest;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
@@ -182,7 +183,11 @@ public class RESTTokenFileIO implements FileIO {
private void refreshToken() {
GetTableTokenResponse response;
if (catalogInstance != null) {
- response = catalogInstance.loadTableToken(identifier);
+ try {
+ response = catalogInstance.loadTableToken(identifier);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
} else {
try (RESTCatalog catalog = catalogLoader.load()) {
response = catalog.loadTableToken(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 69560f36e3..de6a35d010 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -74,6 +74,10 @@ public class ResourcePaths {
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES,
tableName, "token");
}
+ public String tableSnapshot(String databaseName, String tableName) {
+ return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES,
tableName, "snapshot");
+ }
+
public String partitions(String databaseName, String tableName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES,
tableName, "partitions");
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
index 5dc6cffade..cb05c6c6c5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
@@ -23,5 +23,6 @@ public enum ErrorResponseResourceType {
DATABASE,
TABLE,
COLUMN,
- VIEW
+ VIEW,
+ SNAPSHOT
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableSnapshotResponse.java
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableSnapshotResponse.java
new file mode 100644
index 0000000000..c1d25d0b40
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableSnapshotResponse.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.rest.RESTResponse;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Response for table snapshot. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GetTableSnapshotResponse implements RESTResponse {
+
+ private static final String FIELD_SNAPSHOT = "snapshot";
+
+ @JsonProperty(FIELD_SNAPSHOT)
+ private final Snapshot snapshot;
+
+ @JsonCreator
+ public GetTableSnapshotResponse(@JsonProperty(FIELD_SNAPSHOT) Snapshot
snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ @JsonGetter(FIELD_SNAPSHOT)
+ public Snapshot getSnapshot() {
+ return snapshot;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 98e3b48fe4..6213503a47 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -270,7 +270,8 @@ public class SchemaManager implements Serializable {
public TableSchema commitChanges(List<SchemaChange> changes)
throws Catalog.TableNotExistException,
Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
- SnapshotManager snapshotManager = new SnapshotManager(fileIO,
tableRoot, branch);
+ SnapshotManager snapshotManager =
+ new SnapshotManager(fileIO, tableRoot, branch, null, null);
boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null);
while (true) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index 9ae7a8b028..8490dfb562 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -22,6 +22,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.SnapshotCommit;
+import org.apache.paimon.tag.SnapshotLoaderImpl;
+import org.apache.paimon.utils.SnapshotLoader;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -37,20 +39,23 @@ public class CatalogEnvironment implements Serializable {
@Nullable private final String uuid;
@Nullable private final CatalogLoader catalogLoader;
@Nullable private final SnapshotCommit.Factory commitFactory;
+ private final boolean supportsSnapshots;
public CatalogEnvironment(
@Nullable Identifier identifier,
@Nullable String uuid,
@Nullable CatalogLoader catalogLoader,
- @Nullable SnapshotCommit.Factory commitFactory) {
+ @Nullable SnapshotCommit.Factory commitFactory,
+ boolean supportsSnapshots) {
this.identifier = identifier;
this.uuid = uuid;
this.catalogLoader = catalogLoader;
this.commitFactory = commitFactory;
+ this.supportsSnapshots = supportsSnapshots;
}
public static CatalogEnvironment empty() {
- return new CatalogEnvironment(null, null, null, null);
+ return new CatalogEnvironment(null, null, null, null, false);
}
@Nullable
@@ -79,6 +84,14 @@ public class CatalogEnvironment implements Serializable {
return PartitionHandler.create(catalogLoader.load(), identifier);
}
+ @Nullable
+ public SnapshotLoader snapshotLoader() {
+ if (catalogLoader == null || !supportsSnapshots) {
+ return null;
+ }
+ return new SnapshotLoaderImpl(catalogLoader, identifier);
+ }
+
@VisibleForTesting
public SnapshotCommit.Factory commitFactory() {
return commitFactory;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/SnapshotLoaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/tag/SnapshotLoaderImpl.java
new file mode 100644
index 0000000000..bd275ee29c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/SnapshotLoaderImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.SupportsSnapshots;
+import org.apache.paimon.utils.SnapshotLoader;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** Implementation of {@link SnapshotLoader}. */
+public class SnapshotLoaderImpl implements SnapshotLoader {
+
+ private final CatalogLoader catalogLoader;
+ private final Identifier identifier;
+
+ public SnapshotLoaderImpl(CatalogLoader catalogLoader, Identifier
identifier) {
+ this.catalogLoader = catalogLoader;
+ this.identifier = identifier;
+ }
+
+ @Override
+ public Optional<Snapshot> load() throws IOException {
+ try (Catalog catalog = catalogLoader.load()) {
+ return ((SupportsSnapshots) catalog).loadSnapshot(identifier);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public SnapshotLoader copyWithBranch(String branch) {
+ return new SnapshotLoaderImpl(
+ catalogLoader,
+ new Identifier(identifier.getDatabaseName(),
identifier.getTableName(), branch));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotLoader.java
similarity index 69%
copy from
paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
copy to paimon-core/src/main/java/org/apache/paimon/utils/SnapshotLoader.java
index 5dc6cffade..b0957ba574 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotLoader.java
@@ -16,12 +16,18 @@
* limitations under the License.
*/
-package org.apache.paimon.rest.responses;
+package org.apache.paimon.utils;
-/** The type of resource that caused the error. */
-public enum ErrorResponseResourceType {
- DATABASE,
- TABLE,
- COLUMN,
- VIEW
+import org.apache.paimon.Snapshot;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Optional;
+
+/** Loader to load latest snapshot. */
+public interface SnapshotLoader extends Serializable {
+
+ Optional<Snapshot> load() throws IOException;
+
+ SnapshotLoader copyWithBranch(String branch);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 8b7c91d1f8..5147a19838 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -54,7 +55,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
-import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
@@ -78,30 +78,28 @@ public class SnapshotManager implements Serializable {
private final FileIO fileIO;
private final Path tablePath;
private final String branch;
+ @Nullable private final SnapshotLoader snapshotLoader;
@Nullable private final Cache<Path, Snapshot> cache;
- public SnapshotManager(FileIO fileIO, Path tablePath) {
- this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
- }
-
- /** Specify the default branch for data writing. */
- public SnapshotManager(FileIO fileIO, Path tablePath, @Nullable String
branchName) {
- this(fileIO, tablePath, branchName, null);
- }
-
public SnapshotManager(
FileIO fileIO,
Path tablePath,
@Nullable String branchName,
+ @Nullable SnapshotLoader snapshotLoader,
@Nullable Cache<Path, Snapshot> cache) {
this.fileIO = fileIO;
this.tablePath = tablePath;
this.branch = BranchManager.normalizeBranch(branchName);
+ this.snapshotLoader = snapshotLoader;
this.cache = cache;
}
public SnapshotManager copyWithBranch(String branchName) {
- return new SnapshotManager(fileIO, tablePath, branchName);
+ SnapshotLoader newSnapshotLoader = null;
+ if (snapshotLoader != null) {
+ newSnapshotLoader = snapshotLoader.copyWithBranch(branchName);
+ }
+ return new SnapshotManager(fileIO, tablePath, branchName,
newSnapshotLoader, cache);
}
public FileIO fileIO() {
@@ -209,12 +207,22 @@ public class SnapshotManager implements Serializable {
}
public @Nullable Snapshot latestSnapshot() {
+ if (snapshotLoader != null) {
+ try {
+ return snapshotLoader.load().orElse(null);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
Long snapshotId = latestSnapshotId();
return snapshotId == null ? null : snapshot(snapshotId);
}
public @Nullable Long latestSnapshotId() {
try {
+ if (snapshotLoader != null) {
+ return snapshotLoader.load().map(Snapshot::id).orElse(null);
+ }
return findLatest(snapshotDirectory(), SNAPSHOT_PREFIX,
this::snapshotPath);
} catch (IOException e) {
throw new RuntimeException("Failed to find latest snapshot id", e);
diff --git a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
index d8266c1c6d..08a5dba91e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
@@ -18,9 +18,16 @@
package org.apache.paimon;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.SnapshotManager;
+
import org.junit.jupiter.api.Test;
-class SnapshotTest {
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+
+/** Test for snapshots. */
+public class SnapshotTest {
@Test
public void testJsonIgnoreProperties() {
@@ -41,4 +48,12 @@ class SnapshotTest {
+ " \"unknownKey\" : 22222\n"
+ "}");
}
+
+ public static SnapshotManager newSnapshotManager(FileIO fileIO, Path
tablePath) {
+ return newSnapshotManager(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
+ }
+
+ public static SnapshotManager newSnapshotManager(FileIO fileIO, Path
tablePath, String branch) {
+ return new SnapshotManager(fileIO, tablePath, branch, null, null);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
index 20f2382b09..4489fe9a41 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
@@ -48,6 +48,7 @@ import java.util.Random;
import java.util.UUID;
import static java.util.Collections.singletonMap;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for append table compaction. */
@@ -69,7 +70,7 @@ public class AppendOnlyTableCompactionTest {
FileIO fileIO = new LocalFileIO();
path = new org.apache.paimon.fs.Path(tempDir.toString());
tableSchema = new SchemaManager(fileIO, path).createTable(schema());
- snapshotManager = new SnapshotManager(fileIO, path);
+ snapshotManager = newSnapshotManager(fileIO, path);
recreate();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 74d79d452a..8f223e4225 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -133,7 +133,7 @@ public class PartitionExpireTest {
};
CatalogEnvironment env =
- new CatalogEnvironment(null, null, null, null) {
+ new CatalogEnvironment(null, null, null, null, false) {
@Override
public PartitionHandler partitionHandler() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index e9b5b7f0a0..04f514ef2e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -45,6 +45,7 @@ import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
@@ -74,6 +75,7 @@ import java.util.Optional;
import java.util.UUID;
import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
+import static
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
/** Mock REST server for testing. */
public class RESTCatalogServer {
@@ -159,6 +161,10 @@ public class RESTCatalogServer {
resources.length == 4
&& "tables".equals(resources[1])
&& "token".equals(resources[3]);
+ boolean isTableSnapshot =
+ resources.length == 4
+ && "tables".equals(resources[1])
+ && "snapshot".equals(resources[3]);
boolean isPartitions =
resources.length == 4
&& "tables".equals(resources[1])
@@ -243,6 +249,24 @@ public class RESTCatalogServer {
.setBody(
OBJECT_MAPPER.writeValueAsString(
getTableTokenResponse));
+ } else if (isTableSnapshot) {
+ if (!"my_snapshot_table".equals(resources[2])) {
+ response =
+ new ErrorResponse(
+
ErrorResponseResourceType.SNAPSHOT,
+ databaseName,
+ "No Snapshot",
+ 404);
+ return mockResponse(response, 404);
+ }
+ GetTableSnapshotResponse getTableSnapshotResponse =
+ new GetTableSnapshotResponse(
+ createSnapshotWithMillis(10086,
100));
+ return new MockResponse()
+ .setResponseCode(200)
+ .setBody(
+ OBJECT_MAPPER.writeValueAsString(
+ getTableSnapshotResponse));
} else if (isTableRename) {
return renameTableApiHandler(catalog, request);
} else if (isTableCommit) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index a0d17057a2..20e3127a8a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -18,6 +18,8 @@
package org.apache.paimon.rest;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
@@ -45,8 +47,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -147,6 +151,24 @@ class RESTCatalogTest extends CatalogTestBase {
}
}
+ @Test
+ void testSnapshotFromREST() throws Catalog.TableNotExistException {
+ Options options = new Options();
+ options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+ options.set(RESTCatalogOptions.TOKEN, initToken);
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
+ RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options));
+
+ Optional<Snapshot> snapshot =
+ catalog.loadSnapshot(Identifier.create("test_db_a",
"my_snapshot_table"));
+ assertThat(snapshot).isPresent();
+ assertThat(snapshot.get().id()).isEqualTo(10086);
+ assertThat(snapshot.get().timeMillis()).isEqualTo(100);
+
+ snapshot = catalog.loadSnapshot(Identifier.create("test_db_a",
"unknown"));
+ assertThat(snapshot).isEmpty();
+ }
+
@Override
protected boolean supportsFormatTable() {
return true;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 34e05dd8bd..aa96c3b7df 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -117,6 +117,7 @@ import static
org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
@@ -667,7 +668,7 @@ public abstract class FileStoreTableTestBase {
}
SnapshotManager snapshotManager =
- new SnapshotManager(FileIOFinder.find(tablePath),
table.location());
+ newSnapshotManager(FileIOFinder.find(tablePath),
table.location());
Long latestSnapshotId = snapshotManager.latestSnapshotId();
assertThat(latestSnapshotId).isNotNull();
for (int i = 1; i <= latestSnapshotId; i++) {
@@ -1111,7 +1112,7 @@ public abstract class FileStoreTableTestBase {
// snapshot 2
write.write(rowData(2, 20, 200L));
commit.commit(1, write.prepareCommit(false, 2));
- SnapshotManager snapshotManager = new SnapshotManager(new
TraceableFileIO(), tablePath);
+ SnapshotManager snapshotManager = newSnapshotManager(new
TraceableFileIO(), tablePath);
// The snapshot 1 is expired.
assertThat(snapshotManager.snapshotExists(1)).isFalse();
table.createTag("test-tag-2", 1);
@@ -1186,7 +1187,7 @@ public abstract class FileStoreTableTestBase {
// verify snapshot in test-branch is equal to snapshot 2
SnapshotManager snapshotManager =
- new SnapshotManager(new TraceableFileIO(), tablePath,
"test-branch");
+ newSnapshotManager(new TraceableFileIO(), tablePath,
"test-branch");
Snapshot branchSnapshot =
Snapshot.fromPath(new TraceableFileIO(),
snapshotManager.snapshotPath(2));
assertThat(branchSnapshot.equals(snapshot2)).isTrue();
@@ -1334,7 +1335,7 @@ public abstract class FileStoreTableTestBase {
"2|20|200|binary|varbinary|mapKey:mapVal|multiset");
// verify snapshot in branch1 and main branch is same
- SnapshotManager snapshotManager = new SnapshotManager(new
TraceableFileIO(), tablePath);
+ SnapshotManager snapshotManager = newSnapshotManager(new
TraceableFileIO(), tablePath);
Snapshot branchSnapshot =
Snapshot.fromPath(
new TraceableFileIO(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
index 13c9f87838..1516f5bf2a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -56,7 +57,7 @@ public class IncrementalTimeStampTableTest extends
TableTestBase {
catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
- SnapshotManager snapshotManager = new
SnapshotManager(LocalFileIO.create(), tablePath);
+ SnapshotManager snapshotManager =
newSnapshotManager(LocalFileIO.create(), tablePath);
Long timestampEarliest = System.currentTimeMillis();
// snapshot 1: append
@@ -145,7 +146,7 @@ public class IncrementalTimeStampTableTest extends
TableTestBase {
catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
- SnapshotManager snapshotManager = new
SnapshotManager(LocalFileIO.create(), tablePath);
+ SnapshotManager snapshotManager =
newSnapshotManager(LocalFileIO.create(), tablePath);
// snapshot 1: append
write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1),
GenericRow.of(1, 3, 1));
@@ -183,7 +184,7 @@ public class IncrementalTimeStampTableTest extends
TableTestBase {
catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
- SnapshotManager snapshotManager = new
SnapshotManager(LocalFileIO.create(), tablePath);
+ SnapshotManager snapshotManager =
newSnapshotManager(LocalFileIO.create(), tablePath);
// snapshot 1: append
write(
@@ -240,7 +241,7 @@ public class IncrementalTimeStampTableTest extends
TableTestBase {
catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
- SnapshotManager snapshotManager = new
SnapshotManager(LocalFileIO.create(), tablePath);
+ SnapshotManager snapshotManager =
newSnapshotManager(LocalFileIO.create(), tablePath);
Long timestampEarliest = System.currentTimeMillis();
// snapshot 1: append
write(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 82aefcbdd3..f3f55ac197 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -53,6 +53,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
@@ -91,7 +92,7 @@ public class FilesTableTest extends TableTestBase {
Identifier filesTableId =
identifier(tableName + Catalog.SYSTEM_TABLE_SPLITTER +
FilesTable.FILES);
filesTable = (FilesTable) catalog.getTable(filesTableId);
- snapshotManager = new SnapshotManager(fileIO, tablePath);
+ snapshotManager = newSnapshotManager(fileIO, tablePath);
// snapshot 1: append
write(table, GenericRow.of(1, 1, 10, 1), GenericRow.of(1, 2, 20, 5));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index f375dfd2c8..b16d0f82a8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertThrows;
@@ -76,7 +77,7 @@ public class ManifestsTableTest extends TableTestBase {
FileIO fileIO = LocalFileIO.create();
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
- snapshotManager = new SnapshotManager(fileIO, tablePath);
+ snapshotManager = newSnapshotManager(fileIO, tablePath);
ManifestList.Factory factory =
new ManifestList.Factory(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
index 44381c124d..a7055c5c5e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
@@ -48,6 +48,7 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link SnapshotsTable}. */
@@ -71,7 +72,7 @@ public class SnapshotsTableTest extends TableTestBase {
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
.option(CoreOptions.BUCKET.key(), "2")
.build();
- snapshotManager = new SnapshotManager(fileIO, tablePath);
+ snapshotManager = newSnapshotManager(fileIO, tablePath);
TableSchema tableSchema =
SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath),
schema);
FileStoreTable table =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 61f6135b16..fe79938211 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -53,7 +55,7 @@ public class SnapshotManagerTest {
@Test
public void testSnapshotPath() {
SnapshotManager snapshotManager =
- new SnapshotManager(LocalFileIO.create(), new
Path(tempDir.toString()));
+ newSnapshotManager(LocalFileIO.create(), new
Path(tempDir.toString()));
for (int i = 0; i < 20; i++) {
assertThat(snapshotManager.snapshotPath(i))
.isEqualTo(new Path(tempDir.toString() +
"/snapshot/snapshot-" + i));
@@ -203,7 +205,7 @@ public class SnapshotManagerTest {
long millis = 1684726826L;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
- new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ newSnapshotManager(localFileIO, new Path(tempDir.toString()));
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
@@ -239,7 +241,7 @@ public class SnapshotManagerTest {
assertThat(snapshotManager.laterOrEqualWatermark(millis +
999)).isNull();
}
- private Snapshot createSnapshotWithMillis(long id, long millis) {
+ public static Snapshot createSnapshotWithMillis(long id, long millis) {
return new Snapshot(
id,
0L,
@@ -304,7 +306,7 @@ public class SnapshotManagerTest {
public void testLatestSnapshotOfUser() throws IOException,
InterruptedException {
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
- new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ newSnapshotManager(localFileIO, new Path(tempDir.toString()));
// create 100 snapshots using user "lastCommitUser"
for (long i = 0; i < 100; i++) {
Snapshot snapshot =
@@ -353,7 +355,7 @@ public class SnapshotManagerTest {
public void testTraversalSnapshotsFromLatestSafely() throws IOException,
InterruptedException {
FileIO localFileIO = LocalFileIO.create();
Path path = new Path(tempDir.toString());
- SnapshotManager snapshotManager = new SnapshotManager(localFileIO,
path);
+ SnapshotManager snapshotManager = newSnapshotManager(localFileIO,
path);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot =
@@ -449,7 +451,7 @@ public class SnapshotManagerTest {
public void testLongLivedChangelog() throws Exception {
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
- new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ newSnapshotManager(localFileIO, new Path(tempDir.toString()));
long millis = 1L;
for (long i = 1; i <= 5; i++) {
Changelog changelog = createChangelogWithMillis(i, millis + i *
1000);
@@ -474,7 +476,7 @@ public class SnapshotManagerTest {
public void testCommitChangelogWhenSameChangelogCommitTwice() throws
IOException {
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
- new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ newSnapshotManager(localFileIO, new Path(tempDir.toString()));
long id = 1L;
Changelog changelog = createChangelogWithMillis(id, 1L);
snapshotManager.commitChangelog(changelog, id);
@@ -492,7 +494,7 @@ public class SnapshotManagerTest {
private boolean deleteEarliestSnapshot = false;
public TestSnapshotManager(FileIO fileIO, Path tablePath, boolean
isRaceCondition) {
- super(fileIO, tablePath);
+ super(fileIO, tablePath, DEFAULT_MAIN_BRANCH, null, null);
this.isRaceCondition = isRaceCondition;
}
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index d8fcddd1c8..1d29080bb3 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -121,7 +122,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
iterator.close();
SnapshotManager snapshotManager =
- new SnapshotManager(LocalFileIO.create(),
getTableDirectory("T1"));
+ newSnapshotManager(LocalFileIO.create(),
getTableDirectory("T1"));
List<Snapshot> snapshots =
new
ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index 01d615c3e1..32c09c35fc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -58,6 +58,8 @@ import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
+
/** ITCase for catalog. */
public abstract class CatalogITCaseBase extends AbstractTestBase {
@@ -204,7 +206,7 @@ public abstract class CatalogITCaseBase extends
AbstractTestBase {
@Nullable
protected Snapshot findLatestSnapshot(String tableName) {
SnapshotManager snapshotManager =
- new SnapshotManager(LocalFileIO.create(),
getTableDirectory(tableName));
+ newSnapshotManager(LocalFileIO.create(),
getTableDirectory(tableName));
Long id = snapshotManager.latestSnapshotId();
return id == null ? null : snapshotManager.snapshot(id);
}
@@ -212,7 +214,7 @@ public abstract class CatalogITCaseBase extends
AbstractTestBase {
@Nullable
protected Snapshot findSnapshot(String tableName, long snapshotId) {
SnapshotManager snapshotManager =
- new SnapshotManager(LocalFileIO.create(),
getTableDirectory(tableName));
+ newSnapshotManager(LocalFileIO.create(),
getTableDirectory(tableName));
Long id = snapshotManager.latestSnapshotId();
return id == null ? null : id >= snapshotId ?
snapshotManager.snapshot(snapshotId) : null;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index b448858328..689afe09fa 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -44,6 +44,7 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -264,7 +265,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
iterator.close();
SnapshotManager snapshotManager =
- new SnapshotManager(LocalFileIO.create(),
getTableDirectory("T1"));
+ newSnapshotManager(LocalFileIO.create(),
getTableDirectory("T1"));
List<Snapshot> snapshots =
new
ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index d44ba03f12..9b981102ec 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -120,7 +121,7 @@ public class RescaleBucketITCase extends CatalogITCaseBase {
Snapshot lastSnapshot = findLatestSnapshot("T3");
assertThat(lastSnapshot).isNotNull();
SnapshotManager snapshotManager =
- new SnapshotManager(LocalFileIO.create(),
getTableDirectory("T3"));
+ newSnapshotManager(LocalFileIO.create(),
getTableDirectory("T3"));
for (long snapshotId = lastSnapshot.id();
snapshotId > snapshotAfterRescale.id();
snapshotId--) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 28c93ca79b..1981abd373 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -67,6 +67,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -158,7 +159,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
testHarness.snapshot(cpId, 1);
testHarness.notifyOfCompletedCheckpoint(cpId);
- SnapshotManager snapshotManager = new
SnapshotManager(LocalFileIO.create(), tablePath);
+ SnapshotManager snapshotManager =
newSnapshotManager(LocalFileIO.create(), tablePath);
// should create 10 snapshots
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(cpId);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 53e3a6dcb7..1958d15a3f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -72,6 +72,7 @@ import java.util.Objects;
import java.util.UUID;
import static org.apache.paimon.CoreOptions.COMPACTION_MAX_FILE_NUM;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -289,10 +290,9 @@ class StoreMultiCommitterTest {
testHarness.snapshot(cpId, 1);
testHarness.notifyOfCompletedCheckpoint(cpId);
- SnapshotManager snapshotManager1 =
- new SnapshotManager(LocalFileIO.create(), firstTablePath);
+ SnapshotManager snapshotManager1 =
newSnapshotManager(LocalFileIO.create(), firstTablePath);
SnapshotManager snapshotManager2 =
- new SnapshotManager(LocalFileIO.create(), secondTablePath);
+ newSnapshotManager(LocalFileIO.create(), secondTablePath);
// should create 10 snapshots for first table
assertThat(snapshotManager1.latestSnapshotId()).isEqualTo(cpId);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index a9107a78fc..8b4dcf979f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -62,6 +62,7 @@ import java.util.Map;
import java.util.UUID;
import static java.util.Collections.singletonList;
+import static org.apache.paimon.SnapshotTest.newSnapshotManager;
/** Util class to read and write data for source tests. */
public class TestChangelogDataReadWrite {
@@ -113,7 +114,7 @@ public class TestChangelogDataReadWrite {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null);
- this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new
Path(root));
+ this.snapshotManager = newSnapshotManager(LocalFileIO.create(), new
Path(root));
this.commitUser = UUID.randomUUID().toString();
}
diff --git a/paimon-open-api/rest-catalog-open-api.yaml
b/paimon-open-api/rest-catalog-open-api.yaml
index 014c3ee662..4b36bcb3fd 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -469,6 +469,43 @@ paths:
$ref: '#/components/schemas/ErrorResponse'
"500":
description: Internal Server Error
+ /v1/{prefix}/databases/{database}/tables/{table}/snapshot:
+ get:
+ tags:
+ - table
+ summary: Get table snapshot
+ operationId: getTableSnapshot
+ parameters:
+ - name: prefix
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: database
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: table
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ "200":
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GetTableSnapshotResponse'
+ "404":
+ description: Resource not found
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
+ "500":
+ description: Internal Server Error
/v1/{prefix}/databases/{database}/tables/{table}/partitions:
get:
tags:
@@ -1215,6 +1252,11 @@ components:
expiresAt:
type: integer
format: int64
+ GetTableSnapshotResponse:
+ type: object
+ properties:
+ snapshot:
+ $ref: '#/components/schemas/Snapshot'
AlterDatabaseRequest:
type: object
properties:
diff --git
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
index 1ed5d1cca6..059d593c1a 100644
---
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
+++
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
@@ -38,6 +38,7 @@ import
org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
@@ -383,6 +384,31 @@ public class RESTCatalogController {
ImmutableMap.of("key", "value"), System.currentTimeMillis());
}
+ @Operation(
+ summary = "Get table snapshot",
+ tags = {"table"})
+ @ApiResponses({
+ @ApiResponse(
+ responseCode = "200",
+ content = {
+ @Content(schema = @Schema(implementation =
GetTableSnapshotResponse.class))
+ }),
+ @ApiResponse(
+ responseCode = "404",
+ description = "Resource not found",
+ content = {@Content(schema = @Schema(implementation =
ErrorResponse.class))}),
+ @ApiResponse(
+ responseCode = "500",
+ content = {@Content(schema = @Schema())})
+ })
+ @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/snapshot")
+ public GetTableSnapshotResponse getTableSnapshot(
+ @PathVariable String prefix,
+ @PathVariable String database,
+ @PathVariable String table) {
+ return new GetTableSnapshotResponse(null);
+ }
+
@Operation(
summary = "List partitions",
tags = {"partition"})