This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 998ad3ea30 [core] RESTCatalog: add table test and fix branch (#5313)
998ad3ea30 is described below
commit 998ad3ea303fdafb34f13ab1e8b697eba4d8269f
Author: jerry <[email protected]>
AuthorDate: Mon Mar 24 16:42:45 2025 +0800
[core] RESTCatalog: add table test and fix branch (#5313)
---
.../org/apache/paimon/utils/BranchManager.java | 29 ++++++
.../apache/paimon/utils/CatalogBranchManager.java | 27 ++++-
.../paimon/utils/FileSystemBranchManager.java | 32 ++----
...STCatalogTest.java => RESTCatalogMockTest.java} | 110 +++++++++++----------
.../org/apache/paimon/rest/RESTCatalogServer.java | 55 ++++++++---
.../apache/paimon/rest/RESTCatalogTestBase.java | 27 ++++-
.../table/SimpleTableInRESTCatalogMockTest.java | 64 ++++++++++++
.../table/SimpleTableInRESTCatalogTestBase.java | 106 ++++++++++++++++++++
.../apache/paimon/table/SimpleTableTestBase.java | 60 +++++------
paimon-open-api/rest-catalog-open-api.yaml | 12 +--
10 files changed, 385 insertions(+), 137 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index f54eb98752..d144db7d51 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -24,6 +24,8 @@ import javax.annotation.Nullable;
import java.util.List;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Manager for {@code Branch}. */
public interface BranchManager {
@@ -58,4 +60,31 @@ public interface BranchManager {
static boolean isMainBranch(String branch) {
return branch.equals(DEFAULT_MAIN_BRANCH);
}
+
+ static void validateBranch(String branchName) {
+ checkArgument(
+ !BranchManager.isMainBranch(branchName),
+ String.format(
+ "Branch name '%s' is the default branch and cannot be
used.",
+ DEFAULT_MAIN_BRANCH));
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(branchName),
+ "Branch name '%s' is blank.",
+ branchName);
+ checkArgument(
+ !branchName.chars().allMatch(Character::isDigit),
+ "Branch name cannot be pure numeric string but is '%s'.",
+ branchName);
+ }
+
+ static void fastForwardValidate(String branchName) {
+ checkArgument(
+ !branchName.equals(DEFAULT_MAIN_BRANCH),
+ "Branch name '%s' do not use in fast-forward.",
+ branchName);
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(branchName),
+ "Branch name '%s' is blank.",
+ branchName);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
index d87f97badd..92357e5802 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
@@ -40,8 +40,19 @@ public class CatalogBranchManager implements BranchManager {
private void executePost(ThrowingConsumer<Catalog, Exception> func) {
executeGet(
catalog -> {
- func.accept(catalog);
- return null;
+ try {
+ func.accept(catalog);
+ return null;
+ } catch (Catalog.BranchNotExistException e) {
+ throw new IllegalArgumentException(
+ String.format("Branch name '%s' doesn't
exist.", e.branch()));
+ } catch (Catalog.TagNotExistException e) {
+ throw new IllegalArgumentException(
+ String.format("Tag '%s' doesn't exist.",
e.tag()));
+ } catch (Catalog.BranchAlreadyExistException e) {
+ throw new IllegalArgumentException(
+ String.format("Branch name '%s' already
exists..", e.branch()));
+ }
});
}
@@ -62,7 +73,11 @@ public class CatalogBranchManager implements BranchManager {
@Override
public void createBranch(String branchName, @Nullable String tagName) {
- executePost(catalog -> catalog.createBranch(identifier, branchName,
tagName));
+ executePost(
+ catalog -> {
+ BranchManager.validateBranch(branchName);
+ catalog.createBranch(identifier, branchName, tagName);
+ });
}
@Override
@@ -72,7 +87,11 @@ public class CatalogBranchManager implements BranchManager {
@Override
public void fastForward(String branchName) {
- executePost(catalog -> catalog.fastForward(identifier, branchName));
+ executePost(
+ catalog -> {
+ BranchManager.fastForwardValidate(branchName);
+ catalog.fastForward(identifier, branchName);
+ });
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
index 3d4dd7df43..ab3905f2ec 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
@@ -74,7 +74,6 @@ public class FileSystemBranchManager implements BranchManager
{
@Override
public void createBranch(String branchName) {
validateBranch(branchName);
-
try {
TableSchema latestSchema = schemaManager.latest().get();
copySchemasToBranch(branchName, latestSchema.id());
@@ -139,14 +138,7 @@ public class FileSystemBranchManager implements
BranchManager {
@Override
public void fastForward(String branchName) {
- checkArgument(
- !branchName.equals(DEFAULT_MAIN_BRANCH),
- "Branch name '%s' do not use in fast-forward.",
- branchName);
- checkArgument(
- !StringUtils.isNullOrWhitespaceOnly(branchName),
- "Branch name '%s' is blank.",
- branchName);
+ BranchManager.fastForwardValidate(branchName);
checkArgument(branchExists(branchName), "Branch name '%s' doesn't
exist.", branchName);
Long earliestSnapshotId =
snapshotManager.copyWithBranch(branchName).earliestSnapshotId();
@@ -207,6 +199,11 @@ public class FileSystemBranchManager implements
BranchManager {
return fileExists(branchPath);
}
+ public void validateBranch(String branchName) {
+ BranchManager.validateBranch(branchName);
+ checkArgument(!branchExists(branchName), "Branch name '%s' already
exists.", branchName);
+ }
+
@Override
public List<String> branches() {
try {
@@ -218,23 +215,6 @@ public class FileSystemBranchManager implements
BranchManager {
}
}
- private void validateBranch(String branchName) {
- checkArgument(
- !BranchManager.isMainBranch(branchName),
- String.format(
- "Branch name '%s' is the default branch and cannot be
used.",
- DEFAULT_MAIN_BRANCH));
- checkArgument(
- !StringUtils.isNullOrWhitespaceOnly(branchName),
- "Branch name '%s' is blank.",
- branchName);
- checkArgument(!branchExists(branchName), "Branch name '%s' already
exists.", branchName);
- checkArgument(
- !branchName.chars().allMatch(Character::isDigit),
- "Branch name cannot be pure numeric string but is '%s'.",
- branchName);
- }
-
private void copySchemasToBranch(String branchName, long schemaId) throws
IOException {
for (int i = 0; i <= schemaId; i++) {
if (schemaManager.schemaExists(i)) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogMockTest.java
similarity index 76%
rename from
paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogMockTest.java
index e9370760f8..65770bdecb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogMockTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -50,7 +51,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
/** Test REST Catalog on Mocked REST server. */
-class MockRESTCatalogTest extends RESTCatalogTestBase {
+class RESTCatalogMockTest extends RESTCatalogTestBase {
private RESTCatalogServer restCatalogServer;
private final String initToken = "init_token";
@@ -58,32 +59,21 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
private final String serverDefineHeaderValue = "test-value";
private String dataPath;
private AuthProvider authProvider;
+ private Map<String, String> authMap;
@BeforeEach
@Override
public void setUp() throws Exception {
super.setUp();
dataPath = warehouse;
- String restWarehouse = UUID.randomUUID().toString();
- this.config =
- new ConfigResponse(
- ImmutableMap.of(
- RESTCatalogInternalOptions.PREFIX.key(),
- "paimon",
- "header." + serverDefineHeaderName,
- serverDefineHeaderValue,
- CatalogOptions.WAREHOUSE.key(),
- restWarehouse),
- ImmutableMap.of());
this.authProvider = new BearTokenAuthProvider(initToken);
- restCatalogServer =
- new RESTCatalogServer(dataPath, authProvider, this.config,
restWarehouse);
- restCatalogServer.start();
- options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
- options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
- options.set(RESTCatalogOptions.TOKEN, initToken);
- options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
- this.restCatalog = new RESTCatalog(CatalogContext.create(options));
+ this.authMap =
+ ImmutableMap.of(
+ RESTCatalogOptions.TOKEN.key(),
+ initToken,
+ RESTCatalogOptions.TOKEN_PROVIDER.key(),
+ AuthProviderEnum.BEAR.identifier());
+ this.restCatalog = initCatalog(false);
this.catalog = restCatalog;
}
@@ -105,30 +95,24 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
@Test
void testDlfStSTokenAuth() throws Exception {
- String restWarehouse = UUID.randomUUID().toString();
String akId = "akId" + UUID.randomUUID();
String akSecret = "akSecret" + UUID.randomUUID();
String securityToken = "securityToken" + UUID.randomUUID();
String region = "cn-hangzhou";
- DLFAuthProvider authProvider =
- DLFAuthProvider.buildAKToken(akId, akSecret, securityToken,
region);
- restCatalogServer =
- new RESTCatalogServer(dataPath, authProvider, this.config,
restWarehouse);
- restCatalogServer.start();
- options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
- options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
- options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.DLF.identifier());
- options.set(RESTCatalogOptions.DLF_REGION, region);
- options.set(RESTCatalogOptions.DLF_ACCESS_KEY_ID, akId);
- options.set(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET, akSecret);
- options.set(RESTCatalogOptions.DLF_SECURITY_TOKEN, securityToken);
- RESTCatalog restCatalog = new
RESTCatalog(CatalogContext.create(options));
+ this.authProvider = DLFAuthProvider.buildAKToken(akId, akSecret,
securityToken, region);
+ this.authMap =
+ ImmutableMap.of(
+ RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.DLF.identifier(),
+ RESTCatalogOptions.DLF_REGION.key(), region,
+ RESTCatalogOptions.DLF_ACCESS_KEY_ID.key(), akId,
+ RESTCatalogOptions.DLF_ACCESS_KEY_SECRET.key(),
akSecret,
+ RESTCatalogOptions.DLF_SECURITY_TOKEN.key(),
securityToken);
+ RESTCatalog restCatalog = initCatalog(false);
testDlfAuth(restCatalog);
}
@Test
void testDlfStSTokenPathAuth() throws Exception {
- String restWarehouse = UUID.randomUUID().toString();
String region = "cn-hangzhou";
String tokenPath = dataPath + UUID.randomUUID();
generateTokenAndWriteToFile(tokenPath);
@@ -138,17 +122,13 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
new Options(
ImmutableMap.of(
RESTCatalogOptions.DLF_TOKEN_PATH.key(), tokenPath)));
- DLFAuthProvider authProvider =
- DLFAuthProvider.buildRefreshToken(tokenLoader, 1000_000L,
region);
- restCatalogServer =
- new RESTCatalogServer(dataPath, authProvider, this.config,
restWarehouse);
- restCatalogServer.start();
- options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
- options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
- options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.DLF.identifier());
- options.set(RESTCatalogOptions.DLF_REGION, region);
- options.set(RESTCatalogOptions.DLF_TOKEN_PATH, tokenPath);
- RESTCatalog restCatalog = new
RESTCatalog(CatalogContext.create(options));
+ this.authProvider = DLFAuthProvider.buildRefreshToken(tokenLoader,
1000_000L, region);
+ this.authMap =
+ ImmutableMap.of(
+ RESTCatalogOptions.TOKEN_PROVIDER.key(),
AuthProviderEnum.DLF.identifier(),
+ RESTCatalogOptions.DLF_REGION.key(), region,
+ RESTCatalogOptions.DLF_TOKEN_PATH.key(), tokenPath);
+ RESTCatalog restCatalog = initCatalog(false);
testDlfAuth(restCatalog);
File file = new File(tokenPath);
file.delete();
@@ -184,12 +164,8 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
}
@Override
- protected Catalog newRestCatalogWithDataToken() {
- options.set(RESTTokenFileIO.DATA_TOKEN_ENABLED, true);
- options.set(
- RESTTestFileIO.DATA_PATH_CONF_KEY,
- dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
- return new RESTCatalog(CatalogContext.create(options));
+ protected Catalog newRestCatalogWithDataToken() throws IOException {
+ return initCatalog(true);
}
@Override
@@ -234,4 +210,34 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
fileCount,
lastFileCreationTime);
}
+
+ private RESTCatalog initCatalog(boolean enableDataToken) throws
IOException {
+ String restWarehouse = UUID.randomUUID().toString();
+ this.config =
+ new ConfigResponse(
+ ImmutableMap.of(
+ RESTCatalogInternalOptions.PREFIX.key(),
+ "paimon",
+ "header." + serverDefineHeaderName,
+ serverDefineHeaderValue,
+ RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
+ enableDataToken + "",
+ CatalogOptions.WAREHOUSE.key(),
+ restWarehouse),
+ ImmutableMap.of());
+ restCatalogServer =
+ new RESTCatalogServer(dataPath, this.authProvider,
this.config, restWarehouse);
+ restCatalogServer.start();
+ for (Map.Entry<String, String> entry : this.authMap.entrySet()) {
+ options.set(entry.getKey(), entry.getValue());
+ }
+ options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+ options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+ String path =
+ enableDataToken
+ ? dataPath.replaceFirst("file",
RESTFileIOTestLoader.SCHEME)
+ : dataPath;
+ options.set(RESTTestFileIO.DATA_PATH_CONF_KEY, path);
+ return new RESTCatalog(CatalogContext.create(options));
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 373d830019..b755c7772c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -106,6 +106,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_CLEAN_EMPTY_DIRECTORIES;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
@@ -603,7 +604,16 @@ public class RESTCatalogServer {
FileStoreTable table = getFileTable(identifier);
String identifierWithSnapshotId =
geTableFullNameWithSnapshotId(identifier, snapshotId);
if
(tableWithSnapshotId2SnapshotStore.containsKey(identifierWithSnapshotId)) {
- rollbackTo(identifier, table, snapshotId);
+ table =
+ table.copy(
+ Collections.singletonMap(
+ SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(),
"true"));
+ long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+ table.rollbackTo(snapshotId);
+ cleanSnapshot(identifier, snapshotId, latestSnapshotId);
+ tableLatestSnapshotStore.put(
+ identifier.getFullName(),
+
tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId));
return new MockResponse().setResponseCode(200);
}
return mockResponse(
@@ -616,12 +626,20 @@ public class RESTCatalogServer {
FileStoreTable table = getFileTable(identifier);
boolean isExist = table.tagManager().tagExists(tagName);
if (isExist) {
- table.tagManager().get(tagName);
Snapshot snapshot =
table.tagManager().getOrThrow(tagName).trimToSnapshot();
String identifierWithSnapshotId =
geTableFullNameWithSnapshotId(identifier, snapshot.id());
if
(tableWithSnapshotId2SnapshotStore.containsKey(identifierWithSnapshotId)) {
- rollbackTo(identifier, table, snapshot.id());
+ table =
+ table.copy(
+ Collections.singletonMap(
+
SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(), "true"));
+ long latestSnapshotId =
table.snapshotManager().latestSnapshotId();
+ table.rollbackTo(tagName);
+ cleanSnapshot(identifier, snapshot.id(), latestSnapshotId);
+ tableLatestSnapshotStore.put(
+ identifier.getFullName(),
+
tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId));
return new MockResponse().setResponseCode(200);
}
}
@@ -629,19 +647,14 @@ public class RESTCatalogServer {
new ErrorResponse(ErrorResponseResourceType.TAG, "" + tagName,
"", 404), 404);
}
- private void rollbackTo(Identifier identifier, FileStoreTable table, Long
snapshotId) {
- String identifierWithSnapshotId =
geTableFullNameWithSnapshotId(identifier, snapshotId);
- long latestSnapshotId = table.latestSnapshot().get().id();
+ private void cleanSnapshot(Identifier identifier, Long snapshotId, Long
latestSnapshotId)
+ throws IOException {
if (latestSnapshotId > snapshotId) {
for (long i = snapshotId + 1; i < latestSnapshotId + 1; i++) {
tableWithSnapshotId2SnapshotStore.remove(
geTableFullNameWithSnapshotId(identifier, i));
}
}
- table.rollbackTo(snapshotId);
- tableLatestSnapshotStore.put(
- identifier.getFullName(),
-
tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId));
}
private MockResponse databasesApiHandler(
@@ -1060,6 +1073,14 @@ public class RESTCatalogServer {
if (resources.length == 6) {
branch = RESTUtil.decodeString(resources[4]);
branchManager.fastForward(branch);
+ branchIdentifier =
+ new Identifier(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ branch);
+ tableLatestSnapshotStore.put(
+ identifier.getFullName(),
+
tableLatestSnapshotStore.get(branchIdentifier.getFullName()));
} else {
CreateBranchRequest requestBody =
OBJECT_MAPPER.readValue(data,
CreateBranchRequest.class);
@@ -1075,6 +1096,9 @@ public class RESTCatalogServer {
identifier.getDatabaseName(),
identifier.getTableName(),
requestBody.branch());
+ tableLatestSnapshotStore.put(
+ branchIdentifier.getFullName(),
+
tableLatestSnapshotStore.get(identifier.getFullName()));
tableMetadataStore.put(
branchIdentifier.getFullName(),
tableMetadataStore.get(identifier.getFullName()));
@@ -1515,8 +1539,15 @@ public class RESTCatalogServer {
Identifier identifier, long schemaId, Schema schema, String uuid,
boolean isExternal) {
Map<String, String> options = new HashMap<>(schema.options());
Path path = catalog.getTableLocation(identifier);
- String restPath =
- path.toString().replaceFirst(LocalFileIOLoader.SCHEME,
RESTFileIOTestLoader.SCHEME);
+ String restPath = path.toString();
+ if (this.configResponse
+ .getDefaults()
+ .getOrDefault(RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
"false")
+ .equals("true")) {
+ restPath =
+ path.toString()
+ .replaceFirst(LocalFileIOLoader.SCHEME,
RESTFileIOTestLoader.SCHEME);
+ }
options.put(PATH.key(), restPath);
TableSchema tableSchema =
new TableSchema(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
index 11beccb2a8..3d507a680b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
@@ -960,6 +960,31 @@ public abstract class RESTCatalogTestBase extends
CatalogTestBase {
assertThat(result).containsExactlyInAnyOrder("+I[5]", "+I[12]",
"+I[18]");
}
+ @Test
+ public void testBranchBatchRecordsWrite() throws Exception {
+ Identifier tableIdentifier = Identifier.create("my_db", "my_table");
+
+ Identifier tableBranchIdentifier =
+ new Identifier(
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getTableName(),
+ "branch1");
+ createTable(tableIdentifier, Maps.newHashMap(),
Lists.newArrayList("col1"));
+ FileStoreTable tableTestWrite = (FileStoreTable)
catalog.getTable(tableIdentifier);
+ // write
+ batchWrite(tableTestWrite, Lists.newArrayList(12, 5, 18));
+ restCatalog.createBranch(tableIdentifier,
tableBranchIdentifier.getBranchName(), null);
+ FileStoreTable branchTableTestWrite =
+ (FileStoreTable) catalog.getTable(tableBranchIdentifier);
+ batchWrite(branchTableTestWrite, Lists.newArrayList(1, 9, 2));
+ // read
+ List<String> result = batchRead(tableTestWrite);
+ List<String> branchResult = batchRead(branchTableTestWrite);
+ assertThat(result).containsExactlyInAnyOrder("+I[5]", "+I[12]",
"+I[18]");
+ assertThat(branchResult)
+ .containsExactlyInAnyOrder("+I[5]", "+I[12]", "+I[18]",
"+I[2]", "+I[1]", "+I[9]");
+ }
+
@Test
void testBranches() throws Exception {
String databaseName = "testBranchTable";
@@ -1099,7 +1124,7 @@ public abstract class RESTCatalogTestBase extends
CatalogTestBase {
true);
}
- protected abstract Catalog newRestCatalogWithDataToken();
+ protected abstract Catalog newRestCatalogWithDataToken() throws
IOException;
protected abstract void revokeTablePermission(Identifier identifier);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogMockTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogMockTest.java
new file mode 100644
index 0000000000..2e603af51d
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogMockTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalog;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.auth.AuthProvider;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** test table in rest catalog. */
+public class SimpleTableInRESTCatalogMockTest extends
SimpleTableInRESTCatalogTestBase {
+
+ @Override
+ protected RESTCatalog createRESTCatalog() throws IOException {
+ String restWarehouse = UUID.randomUUID().toString();
+ String initToken = UUID.randomUUID().toString();
+ ConfigResponse config =
+ new ConfigResponse(
+ ImmutableMap.of(
+ RESTCatalogInternalOptions.PREFIX.key(),
+ "paimon",
+ CatalogOptions.WAREHOUSE.key(),
+ restWarehouse),
+ ImmutableMap.of());
+ AuthProvider authProvider = new BearTokenAuthProvider(initToken);
+ RESTCatalogServer restCatalogServer =
+ new RESTCatalogServer(dataPath, authProvider, config,
restWarehouse);
+ restCatalogServer.start();
+ Options options = new Options();
+ options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+ options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+ options.set(RESTCatalogOptions.TOKEN, initToken);
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
+ return new RESTCatalog(CatalogContext.create(options));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogTestBase.java
new file mode 100644
index 0000000000..c550ffe2af
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogTestBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalog;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+
+/** base test table in rest catalog. */
+public abstract class SimpleTableInRESTCatalogTestBase extends
SimpleTableTestBase {
+ protected RESTCatalog restCatalog;
+ protected String dataPath;
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ dataPath = tablePath.toString();
+ restCatalog = createRESTCatalog();
+ restCatalog.createDatabase(identifier.getDatabaseName(), true);
+ }
+
+ protected abstract RESTCatalog createRESTCatalog() throws IOException;
+
+ @BeforeEach
+ public void after() throws Exception {
+ super.after();
+ restCatalog.dropTable(identifier, true);
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable(Consumer<Options> configure,
RowType rowType)
+ throws Exception {
+ Options conf = new Options();
+ configure.accept(conf);
+ if (!conf.contains(BUCKET_KEY) && conf.get(BUCKET) != -1) {
+ conf.set(BUCKET_KEY, "a");
+ }
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ if (restCatalog
+ .listTables(identifier.getDatabaseName())
+ .contains(identifier.getTableName())) {
+ List<SchemaChange> schemaChangeList = new ArrayList<>();
+ for (Map.Entry<String, String> entry : conf.toMap().entrySet()) {
+ schemaChangeList.add(SchemaChange.setOption(entry.getKey(),
entry.getValue()));
+ }
+ restCatalog.alterTable(identifier, schemaChangeList, true);
+ } else {
+ restCatalog.createTable(identifier, tableSchema.toSchema(), false);
+ }
+ return (FileStoreTable) restCatalog.getTable(identifier);
+ }
+
+ @Override
+ protected FileStoreTable createBranchTable(String branch) throws Exception
{
+ if (!restCatalog.listBranches(identifier).contains(branch)) {
+ restCatalog.createBranch(identifier, branch, null);
+ }
+ return (FileStoreTable)
+ restCatalog.getTable(
+ new Identifier(
+ identifier.getDatabaseName(),
identifier.getTableName(), branch));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index cbf9e106ae..3ffe71a496 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TestFileStore;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
@@ -80,7 +81,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
-import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -180,16 +180,18 @@ public abstract class SimpleTableTestBase {
@TempDir java.nio.file.Path tempDir;
protected Path tablePath;
+ protected Identifier identifier;
protected String commitUser;
@BeforeEach
- public void before() {
- tablePath = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
+ public void before() throws Exception {
+ identifier = Identifier.create("default", "table_test");
+ tablePath = new Path(String.format("%s://%s", TraceableFileIO.SCHEME,
tempDir.toString()));
commitUser = UUID.randomUUID().toString();
}
@AfterEach
- public void after() throws IOException {
+ public void after() throws Exception {
// assert all connections are closed
Predicate<Path> pathPredicate = path ->
path.toString().contains(tempDir.toString());
assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
@@ -442,7 +444,8 @@ public abstract class SimpleTableTestBase {
commit.abort(messages);
FileStatus[] files =
- LocalFileIO.create().listStatus(new Path(tablePath +
"/pt=1/bucket-0"));
+ LocalFileIO.create()
+ .listStatus(new Path(table.location().toString() +
"/pt=1/bucket-0"));
assertThat(files).isEmpty();
write.close();
commit.close();
@@ -586,7 +589,6 @@ public abstract class SimpleTableTestBase {
conf.set(SNAPSHOT_NUM_RETAINED_MIN, 3);
conf.set(SNAPSHOT_NUM_RETAINED_MAX, 3);
});
-
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
for (int i = 0; i < 10; i++) {
@@ -605,7 +607,7 @@ public abstract class SimpleTableTestBase {
}
SnapshotManager snapshotManager =
- newSnapshotManager(FileIOFinder.find(tablePath),
table.location());
+ newSnapshotManager(FileIOFinder.find(table.location()),
table.location());
Long latestSnapshotId = snapshotManager.latestSnapshotId();
assertThat(latestSnapshotId).isNotNull();
for (int i = 1; i <= latestSnapshotId; i++) {
@@ -794,7 +796,7 @@ public abstract class SimpleTableTestBase {
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
List<java.nio.file.Path> files =
- Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ Files.walk(new
File(table.location().toUri().getPath()).toPath())
.collect(Collectors.toList());
assertThat(files.size()).isEqualTo(14);
}
@@ -820,7 +822,7 @@ public abstract class SimpleTableTestBase {
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
List<java.nio.file.Path> files =
- Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ Files.walk(new
File(table.location().toUri().getPath()).toPath())
.collect(Collectors.toList());
assertThat(files.size()).isEqualTo(15);
// table-path
@@ -872,7 +874,7 @@ public abstract class SimpleTableTestBase {
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
List<java.nio.file.Path> files =
- Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ Files.walk(new
File(table.location().toUri().getPath()).toPath())
.collect(Collectors.toList());
assertThat(files.size()).isEqualTo(16);
// case 0 plus 1:
@@ -913,7 +915,7 @@ public abstract class SimpleTableTestBase {
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
List<java.nio.file.Path> files =
- Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ Files.walk(new
File(table.location().toUri().getPath()).toPath())
.collect(Collectors.toList());
assertThat(files.size()).isEqualTo(23);
// case 0 plus 7:
@@ -969,7 +971,7 @@ public abstract class SimpleTableTestBase {
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
List<java.nio.file.Path> files =
- Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ Files.walk(new
File(table.location().toUri().getPath()).toPath())
.collect(Collectors.toList());
assertThat(files.size()).isEqualTo(16);
// rollback snapshot case 0 plus 1:
@@ -1016,7 +1018,7 @@ public abstract class SimpleTableTestBase {
table.createTag("test-tag", 2);
// verify that tag file exist
- TagManager tagManager = new TagManager(new TraceableFileIO(),
tablePath);
+ TagManager tagManager = new TagManager(new TraceableFileIO(),
table.location());
assertThat(tagManager.tagExists("test-tag")).isTrue();
// verify that test-tag is equal to snapshot 2
@@ -1040,7 +1042,7 @@ public abstract class SimpleTableTestBase {
commit.commit(0, write.prepareCommit(false, 1));
table.createTag("test-tag", 1);
// verify that tag file exist
- TagManager tagManager = new TagManager(new TraceableFileIO(),
tablePath);
+ TagManager tagManager = new TagManager(new TraceableFileIO(),
table.location());
assertThat(tagManager.tagExists("test-tag")).isTrue();
// verify that test-tag is equal to snapshot 1
Snapshot tagged =
tagManager.getOrThrow("test-tag").trimToSnapshot();
@@ -1049,7 +1051,8 @@ public abstract class SimpleTableTestBase {
// snapshot 2
write.write(rowData(2, 20, 200L));
commit.commit(1, write.prepareCommit(false, 2));
- SnapshotManager snapshotManager = newSnapshotManager(new
TraceableFileIO(), tablePath);
+ SnapshotManager snapshotManager =
+ newSnapshotManager(new TraceableFileIO(),
table.location());
// The snapshot 1 is expired.
assertThat(snapshotManager.snapshotExists(1)).isFalse();
table.createTag("test-tag-2", 1);
@@ -1072,7 +1075,7 @@ public abstract class SimpleTableTestBase {
// snapshot 2
write.write(rowData(1, 10, 100L));
commit.commit(1, write.prepareCommit(false, 2));
- TagManager tagManager = new TagManager(new TraceableFileIO(),
tablePath);
+ TagManager tagManager = new TagManager(new TraceableFileIO(),
table.location());
table.createTag("test-tag", 1);
// verify that tag file exist
assertThat(tagManager.tagExists("test-tag")).isTrue();
@@ -1087,7 +1090,6 @@ public abstract class SimpleTableTestBase {
@Test
public void testCreateBranch() throws Exception {
FileStoreTable table = createFileStoreTable();
-
try (StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser)) {
// snapshot 1
@@ -1101,7 +1103,7 @@ public abstract class SimpleTableTestBase {
table.createTag("test-tag", 2);
// verify that tag file exist
- TagManager tagManager = new TagManager(new TraceableFileIO(),
tablePath);
+ TagManager tagManager = new TagManager(new TraceableFileIO(),
table.location());
assertThat(tagManager.tagExists("test-tag")).isTrue();
// verify that test-tag is equal to snapshot 2
@@ -1124,14 +1126,14 @@ public abstract class SimpleTableTestBase {
// verify snapshot in test-branch is equal to snapshot 2
SnapshotManager snapshotManager =
- newSnapshotManager(new TraceableFileIO(), tablePath,
"test-branch");
+ newSnapshotManager(new TraceableFileIO(), table.location(),
"test-branch");
Snapshot branchSnapshot =
Snapshot.fromPath(new TraceableFileIO(),
snapshotManager.snapshotPath(2));
assertThat(branchSnapshot.equals(snapshot2)).isTrue();
// verify schema in test-branch is equal to schema 0
SchemaManager schemaManager =
- new SchemaManager(new TraceableFileIO(), tablePath,
"test-branch");
+ new SchemaManager(new TraceableFileIO(), table.location(),
"test-branch");
TableSchema branchSchema =
TableSchema.fromPath(new TraceableFileIO(),
schemaManager.toSchemaPath(0));
TableSchema schema0 = schemaManager.schema(0);
@@ -1288,7 +1290,8 @@ public abstract class SimpleTableTestBase {
"2|20|200|binary|varbinary|mapKey:mapVal|multiset");
// verify snapshot in branch1 and main branch is same
- SnapshotManager snapshotManager = newSnapshotManager(new
TraceableFileIO(), tablePath);
+ SnapshotManager snapshotManager =
+ newSnapshotManager(new TraceableFileIO(), table.location());
Snapshot branchSnapshot =
Snapshot.fromPath(
new TraceableFileIO(),
@@ -1298,7 +1301,7 @@ public abstract class SimpleTableTestBase {
assertThat(branchSnapshot.equals(snapshot)).isTrue();
// verify schema in branch1 and main branch is same
- SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(),
tablePath);
+ SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(),
table.location());
TableSchema branchSchema =
TableSchema.fromPath(
new TraceableFileIO(),
@@ -1535,19 +1538,6 @@ public abstract class SimpleTableTestBase {
commit.close();
}
- @Test
- public void testSchemaPathOption() throws Exception {
- String fakePath = "fake path";
- FileStoreTable table = createFileStoreTable(conf ->
conf.set(CoreOptions.PATH, fakePath));
- String originSchemaPath =
table.schema().options().get(CoreOptions.PATH.key());
- assertThat(originSchemaPath).isEqualTo(fakePath);
- // reset PATH of schema option to table location
- table = table.copy(Collections.emptyMap());
- String schemaPath =
table.schema().options().get(CoreOptions.PATH.key());
- String tablePath = table.location().toString();
- assertThat(schemaPath).isEqualTo(tablePath);
- }
-
@Test
public void testBranchWriteAndRead() throws Exception {
FileStoreTable table = createFileStoreTable();
diff --git a/paimon-open-api/rest-catalog-open-api.yaml
b/paimon-open-api/rest-catalog-open-api.yaml
index c30f09f8ac..211be78a68 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -1519,13 +1519,13 @@ components:
properties:
message:
type: string
+ resourceType:
+ type: string
+ resourceName:
+ type: string
code:
type: integer
format: int32
- stack:
- type: array
- items:
- type: string
CreateTableRequest:
type: object
properties:
@@ -1606,9 +1606,7 @@ components:
properties:
database:
type: string
- table:
- type: string
- branch:
+ object:
type: string
Schema:
type: object