This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 75e9da48ed [core] Support in memory catalog for test RESTCatalog
(#5154)
75e9da48ed is described below
commit 75e9da48ed32586f5c930f96b23dc118cd214df9
Author: jerry <[email protected]>
AuthorDate: Wed Feb 26 17:56:55 2025 +0800
[core] Support in memory catalog for test RESTCatalog (#5154)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 2 +-
.../apache/paimon/catalog/FileSystemCatalog.java | 3 +-
.../java/org/apache/paimon/rest/RESTCatalog.java | 6 +-
.../rest/MetadataInMemoryFileSystemCatalog.java | 443 +++++++++++++++++++++
.../org/apache/paimon/rest/RESTCatalogServer.java | 74 +++-
.../org/apache/paimon/rest/RESTCatalogTest.java | 18 +-
.../org/apache/paimon/rest/TestRESTCatalog.java | 264 ------------
7 files changed, 526 insertions(+), 284 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 3df3f0d562..67d6b23187 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -216,7 +216,7 @@ public abstract class AbstractCatalog implements Catalog {
if (changes == null || changes.isEmpty()) {
return;
}
- alterDatabaseImpl(name, changes);
+ this.alterDatabaseImpl(name, changes);
} catch (DatabaseNotExistException e) {
if (ignoreIfNotExists) {
return;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index c1cc5bc230..366051fad5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -94,7 +94,8 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- protected void alterDatabaseImpl(String name, List<PropertyChange>
changes) {
+ protected void alterDatabaseImpl(String name, List<PropertyChange> changes)
+ throws DatabaseNotExistException {
throw new UnsupportedOperationException("Alter database is not
supported.");
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 9f373bd10a..141b22b45d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -241,9 +241,9 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots {
request,
AlterDatabaseResponse.class,
restAuthFunction);
- if (response.getUpdated().isEmpty()) {
- throw new IllegalStateException("Failed to update properties");
- }
+ // if (response.getUpdated().isEmpty()) {
+ // throw new IllegalStateException("Failed to
update properties");
+ // }
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(name);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
new file mode 100644
index 0000000000..e0c5ab9446
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TableType;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.SupportsSnapshots;
+import org.apache.paimon.catalog.TableMetadata;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.view.View;
+
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.PATH;
+
+/** A catalog for testing RESTCatalog. */
+public class MetadataInMemoryFileSystemCatalog extends FileSystemCatalog
+ implements SupportsSnapshots {
+
+ public final Map<String, Database> databaseStore;
+ public final Map<String, TableMetadata> tableMetadataStore;
+ public final Map<String, List<Partition>> tablePartitionsStore;
+ public final Map<String, View> viewStore;
+ public final Map<String, Snapshot> tableSnapshotStore;
+ public final Map<String, RESTToken> dataTokenStore;
+ public FileSystemCatalog fileSystemCatalog;
+
+ public MetadataInMemoryFileSystemCatalog(
+ FileIO fileIO,
+ Path warehouse,
+ Options options,
+ Map<String, Database> databaseStore,
+ Map<String, TableMetadata> tableMetadataStore,
+ Map<String, Snapshot> tableSnapshotStore,
+ Map<String, List<Partition>> tablePartitionsStore,
+ Map<String, View> viewStore,
+ Map<String, RESTToken> dataTokenStore) {
+ super(fileIO, warehouse, options);
+ this.fileSystemCatalog = new FileSystemCatalog(fileIO, warehouse,
options);
+ this.databaseStore = databaseStore;
+ this.tableMetadataStore = tableMetadataStore;
+ this.tablePartitionsStore = tablePartitionsStore;
+ this.tableSnapshotStore = tableSnapshotStore;
+ this.viewStore = viewStore;
+ this.dataTokenStore = dataTokenStore;
+ }
+
+ public static MetadataInMemoryFileSystemCatalog create(
+ CatalogContext context,
+ Map<String, Database> databaseStore,
+ Map<String, TableMetadata> tableMetadataStore,
+ Map<String, Snapshot> tableSnapshotStore,
+ Map<String, List<Partition>> tablePartitionsStore,
+ Map<String, View> viewStore,
+ Map<String, RESTToken> dataTokenStore) {
+ String warehouse =
CatalogFactory.warehouse(context).toUri().toString();
+
+ Path warehousePath = new Path(warehouse);
+ FileIO fileIO;
+
+ try {
+ fileIO = FileIO.get(warehousePath, context);
+ fileIO.checkOrMkdirs(warehousePath);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return new MetadataInMemoryFileSystemCatalog(
+ fileIO,
+ warehousePath,
+ context.options(),
+ databaseStore,
+ tableMetadataStore,
+ tableSnapshotStore,
+ tablePartitionsStore,
+ viewStore,
+ dataTokenStore);
+ }
+
+ @Override
+ public List<String> listDatabases() {
+ return new ArrayList<>(databaseStore.keySet());
+ }
+
+ @Override
+ protected void createDatabaseImpl(String name, Map<String, String>
properties) {
+ super.createDatabaseImpl(name, properties);
+ databaseStore.put(name, Database.of(name, properties, null));
+ }
+
+ @Override
+ public Database getDatabaseImpl(String name) throws
DatabaseNotExistException {
+ if (databaseStore.containsKey(name)) {
+ return databaseStore.get(name);
+ }
+ throw new DatabaseNotExistException(name);
+ }
+
+ @Override
+ protected void dropDatabaseImpl(String name) {
+ super.dropDatabaseImpl(name);
+ databaseStore.remove(name);
+ }
+
+ protected void alterDatabaseImpl(String name, List<PropertyChange> changes)
+ throws DatabaseNotExistException {
+ if (databaseStore.containsKey(name)) {
+ Pair<Map<String, String>, Set<String>> setPropertiesToRemoveKeys =
+ PropertyChange.getSetPropertiesToRemoveKeys(changes);
+ Map<String, String> setProperties =
setPropertiesToRemoveKeys.getLeft();
+ Set<String> removeKeys = setPropertiesToRemoveKeys.getRight();
+ Database database = databaseStore.get(name);
+ Map<String, String> parameter = new HashMap<>(database.options());
+ if (!setProperties.isEmpty()) {
+ parameter.putAll(setProperties);
+ }
+ if (!removeKeys.isEmpty()) {
+ parameter.keySet().removeAll(removeKeys);
+ }
+ Database alterDatabase = Database.of(name, parameter, null);
+ databaseStore.put(name, alterDatabase);
+ } else {
+ throw new DatabaseNotExistException(name);
+ }
+ }
+
+ @Override
+ protected List<String> listTablesImpl(String databaseName) {
+ List<String> tables = new ArrayList<>();
+ for (Map.Entry<String, TableMetadata> entry :
tableMetadataStore.entrySet()) {
+ Identifier identifier = Identifier.fromString(entry.getKey());
+ if (databaseName.equals(identifier.getDatabaseName())) {
+ tables.add(identifier.getTableName());
+ }
+ }
+ return tables;
+ }
+
+ @Override
+ public void createTableImpl(Identifier identifier, Schema schema) {
+ super.createTableImpl(identifier, schema);
+ try {
+ TableMetadata tableMetadata =
+ createTableMetadata(
+ identifier, 1L, schema,
UUID.randomUUID().toString(), false);
+ tableMetadataStore.put(identifier.getFullName(), tableMetadata);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private TableMetadata createTableMetadata(
+ Identifier identifier, long schemaId, Schema schema, String uuid,
boolean isExternal) {
+ Map<String, String> options = new HashMap<>(schema.options());
+ Path path = getTableLocation(identifier);
+ options.put(PATH.key(), path.toString());
+ TableSchema tableSchema =
+ new TableSchema(
+ schemaId,
+ schema.fields(),
+ schema.fields().size() - 1,
+ schema.partitionKeys(),
+ schema.primaryKeys(),
+ options,
+ schema.comment());
+ TableMetadata tableMetadata = new TableMetadata(tableSchema,
isExternal, uuid);
+ return tableMetadata;
+ }
+
+ @Override
+ protected void dropTableImpl(Identifier identifier) {
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ tableMetadataStore.remove(identifier.getFullName());
+ super.dropTableImpl(identifier);
+ }
+ }
+
+ @Override
+ public void renameTableImpl(Identifier fromTable, Identifier toTable) {
+ if (tableMetadataStore.containsKey(fromTable.getFullName())) {
+ super.renameTableImpl(fromTable, toTable);
+ TableMetadata tableMetadata =
tableMetadataStore.get(fromTable.getFullName());
+ tableMetadataStore.remove(fromTable.getFullName());
+ tableMetadataStore.put(toTable.getFullName(), tableMetadata);
+ }
+ }
+
+ @Override
+ protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
+ TableSchema schema = tableMetadata.schema();
+ Options options = Options.fromMap(schema.options());
+ if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) {
+ throw new UnsupportedOperationException("Only data table
support alter table.");
+ }
+ SchemaManager schemaManager = schemaManager(identifier);
+ try {
+ TableSchema newSchema =
+ runWithLock(identifier, () ->
schemaManager.commitChanges(changes));
+ TableMetadata newTableMetadata =
+ createTableMetadata(
+ identifier,
+ newSchema.id(),
+ newSchema.toSchema(),
+ tableMetadata.uuid(),
+ tableMetadata.isExternal());
+ tableMetadataStore.put(identifier.getFullName(),
newTableMetadata);
+ } catch (TableNotExistException
+ | ColumnAlreadyExistException
+ | ColumnNotExistException
+ | RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private SchemaManager schemaManager(Identifier identifier) {
+ Path path = getTableLocation(identifier);
+ return new SchemaManager(fileIO, path,
identifier.getBranchNameOrDefault());
+ }
+
+ @Override
+ public void createFormatTable(Identifier identifier, Schema schema) {
+ TableMetadata tableMetadata =
+ createTableMetadata(identifier, 1L, schema,
UUID.randomUUID().toString(), true);
+ tableMetadataStore.put(identifier.getFullName(), tableMetadata);
+ }
+
+ @Override
+ public TableSchema loadTableSchema(Identifier identifier) throws
TableNotExistException {
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ return tableMetadataStore.get(identifier.getFullName()).schema();
+ }
+ throw new TableNotExistException(identifier);
+ }
+
+ @Override
+ protected TableMetadata loadTableMetadata(Identifier identifier) throws
TableNotExistException {
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ return tableMetadataStore.get(identifier.getFullName());
+ }
+ throw new TableNotExistException(identifier);
+ }
+
+ @Override
+ public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ getTable(identifier);
+ tablePartitionsStore.put(
+ identifier.getFullName(),
+ partitions.stream()
+ .map(partition -> spec2Partition(partition))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ getTable(identifier);
+ List<Partition> existPartitions =
tablePartitionsStore.get(identifier.getFullName());
+ partitions.forEach(
+ partition -> {
+ for (Map.Entry<String, String> entry :
partition.entrySet()) {
+ existPartitions.stream()
+ .filter(
+ p ->
+
p.spec().containsKey(entry.getKey())
+ && p.spec()
+
.get(entry.getKey())
+
.equals(entry.getValue()))
+ .findFirst()
+ .ifPresent(
+ existPartition ->
existPartitions.remove(existPartition));
+ }
+ });
+ }
+
+ @Override
+ public void alterPartitions(Identifier identifier, List<Partition>
partitions)
+ throws TableNotExistException {
+ getTable(identifier);
+ List<Partition> existPartitions =
tablePartitionsStore.get(identifier.getFullName());
+ partitions.forEach(
+ partition -> {
+ for (Map.Entry<String, String> entry :
partition.spec().entrySet()) {
+ existPartitions.stream()
+ .filter(
+ p ->
+
p.spec().containsKey(entry.getKey())
+ && p.spec()
+
.get(entry.getKey())
+
.equals(entry.getValue()))
+ .findFirst()
+ .ifPresent(
+ existPartition ->
existPartitions.remove(existPartition));
+ }
+ });
+ existPartitions.addAll(partitions);
+ tablePartitionsStore.put(identifier.getFullName(), existPartitions);
+ }
+
+ @Override
+ public List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException {
+ getTable(identifier);
+ return tablePartitionsStore.get(identifier.getFullName());
+ }
+
+ @Override
+ public View getView(Identifier identifier) throws ViewNotExistException {
+ if (viewStore.containsKey(identifier.getFullName())) {
+ return viewStore.get(identifier.getFullName());
+ }
+ throw new ViewNotExistException(identifier);
+ }
+
+ @Override
+ public void dropView(Identifier identifier, boolean ignoreIfNotExists)
+ throws ViewNotExistException {
+ if (viewStore.containsKey(identifier.getFullName())) {
+ viewStore.remove(identifier.getFullName());
+ }
+ if (!ignoreIfNotExists) {
+ throw new ViewNotExistException(identifier);
+ }
+ }
+
+ @Override
+ public void createView(Identifier identifier, View view, boolean
ignoreIfExists)
+ throws ViewAlreadyExistException, DatabaseNotExistException {
+ getDatabase(identifier.getDatabaseName());
+ if (viewStore.containsKey(identifier.getFullName()) &&
!ignoreIfExists) {
+ throw new ViewAlreadyExistException(identifier);
+ }
+ viewStore.put(identifier.getFullName(), view);
+ }
+
+ @Override
+ public List<String> listViews(String databaseName) throws
DatabaseNotExistException {
+ getDatabase(databaseName);
+ return viewStore.keySet().stream()
+ .map(Identifier::fromString)
+ .filter(identifier ->
identifier.getDatabaseName().equals(databaseName))
+ .map(Identifier::getTableName)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void renameView(Identifier fromView, Identifier toView, boolean
ignoreIfNotExists)
+ throws ViewNotExistException, ViewAlreadyExistException {
+ if (!viewStore.containsKey(fromView.getFullName()) &&
!ignoreIfNotExists) {
+ throw new ViewNotExistException(fromView);
+ }
+ if (viewStore.containsKey(toView.getFullName())) {
+ throw new ViewAlreadyExistException(toView);
+ }
+ if (viewStore.containsKey(fromView.getFullName())) {
+ View view = viewStore.get(fromView.getFullName());
+ viewStore.remove(fromView.getFullName());
+ viewStore.put(toView.getFullName(), view);
+ }
+ }
+
+ @Override
+ public boolean commitSnapshot(
+ Identifier identifier, Snapshot snapshot, List<Partition>
statistics)
+ throws TableNotExistException {
+ tableSnapshotStore.put(identifier.getFullName(), snapshot);
+ return false;
+ }
+
+ @Override
+ public Optional<Snapshot> loadSnapshot(Identifier identifier) throws
TableNotExistException {
+ return
Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName()));
+ }
+
+ public RESTToken getToken(Identifier identifier) {
+ if (dataTokenStore.containsKey(identifier.getFullName())) {
+ return dataTokenStore.get(identifier.getFullName());
+ }
+ long currentTimeMillis = System.currentTimeMillis();
+ RESTToken token =
+ new RESTToken(
+ ImmutableMap.of(
+ "akId",
+ "akId" + currentTimeMillis,
+ "akSecret",
+ "akSecret" + currentTimeMillis),
+ currentTimeMillis);
+ dataTokenStore.put(identifier.getFullName(), token);
+ return dataTokenStore.get(identifier.getFullName());
+ }
+
+ private Partition spec2Partition(Map<String, String> spec) {
+ // todo: need update
+ return new Partition(spec, 123, 456, 789, 123);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 564e1e5d8a..fd84bf7924 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -19,16 +19,20 @@
package org.apache.paimon.rest;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
+import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterPartitionsRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CommitTableRequest;
@@ -39,6 +43,7 @@ import org.apache.paimon.rest.requests.CreateViewRequest;
import org.apache.paimon.rest.requests.DropPartitionsRequest;
import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
+import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.CommitTableResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponse;
@@ -67,16 +72,18 @@ import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
-import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.stream.Collectors;
import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
-import static
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
/** Mock REST server for testing. */
public class RESTCatalogServer {
@@ -84,16 +91,31 @@ public class RESTCatalogServer {
private static final String PREFIX = "paimon";
private static final String DATABASE_URI =
String.format("/v1/%s/databases", PREFIX);
- private final Catalog catalog;
+ private final MetadataInMemoryFileSystemCatalog catalog;
private final Dispatcher dispatcher;
private final MockWebServer server;
private final String authToken;
+ public final Map<String, Database> databaseStore = new HashMap<>();
+ public final Map<String, TableMetadata> tableMetadataStore = new
HashMap<>();
+ public final Map<String, List<Partition>> tablePartitionsStore = new
HashMap<>();
+ public final Map<String, View> viewStore = new HashMap<>();
+ public final Map<String, Snapshot> tableSnapshotStore = new HashMap<>();
+ Map<String, RESTToken> dataTokenStore = new HashMap<>();
+
public RESTCatalogServer(String warehouse, String initToken) {
authToken = initToken;
Options conf = new Options();
conf.setString("warehouse", warehouse);
- this.catalog = TestRESTCatalog.create(CatalogContext.create(conf));
+ this.catalog =
+ MetadataInMemoryFileSystemCatalog.create(
+ CatalogContext.create(conf),
+ databaseStore,
+ tableMetadataStore,
+ tableSnapshotStore,
+ tablePartitionsStore,
+ viewStore,
+ dataTokenStore);
this.dispatcher = initDispatcher(catalog, warehouse, authToken);
MockWebServer mockWebServer = new MockWebServer();
mockWebServer.setDispatcher(dispatcher);
@@ -112,7 +134,16 @@ public class RESTCatalogServer {
server.shutdown();
}
- public static Dispatcher initDispatcher(Catalog catalog, String warehouse,
String authToken) {
+ public void setTableSnapshot(Identifier identifier, Snapshot snapshot) {
+ tableSnapshotStore.put(identifier.getFullName(), snapshot);
+ }
+
+ public void setDataToken(Identifier identifier, RESTToken token) {
+ dataTokenStore.put(identifier.getFullName(), token);
+ }
+
+ public static Dispatcher initDispatcher(
+ MetadataInMemoryFileSystemCatalog catalog, String warehouse,
String authToken) {
return new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest request) {
@@ -241,17 +272,22 @@ public class RESTCatalogServer {
}
return partitionsApiHandler(catalog, request,
databaseName, tableName);
} else if (isTableToken) {
+ RESTToken dataToken =
+
catalog.getToken(Identifier.create(databaseName, resources[2]));
GetTableTokenResponse getTableTokenResponse =
new GetTableTokenResponse(
- ImmutableMap.of("key", "value"),
- System.currentTimeMillis());
+ dataToken.token(),
dataToken.expireAtMillis());
return new MockResponse()
.setResponseCode(200)
.setBody(
OBJECT_MAPPER.writeValueAsString(
getTableTokenResponse));
} else if (isTableSnapshot) {
- if (!"my_snapshot_table".equals(resources[2])) {
+ String tableName = resources[2];
+ Optional<Snapshot> snapshotOptional =
+ catalog.loadSnapshot(
+ Identifier.create(databaseName,
tableName));
+ if (!snapshotOptional.isPresent()) {
response =
new ErrorResponse(
ErrorResponseResourceType.SNAPSHOT,
@@ -261,8 +297,7 @@ public class RESTCatalogServer {
return mockResponse(response, 404);
}
GetTableSnapshotResponse getTableSnapshotResponse =
- new GetTableSnapshotResponse(
- createSnapshotWithMillis(10086,
100));
+ new
GetTableSnapshotResponse(snapshotOptional.get());
return new MockResponse()
.setResponseCode(200)
.setBody(
@@ -447,6 +482,25 @@ public class RESTCatalogServer {
case "DELETE":
catalog.dropDatabase(databaseName, false, true);
return new MockResponse().setResponseCode(200);
+ case "POST":
+ AlterDatabaseRequest requestBody =
+ OBJECT_MAPPER.readValue(
+ request.getBody().readUtf8(),
AlterDatabaseRequest.class);
+ List<PropertyChange> changes = new ArrayList<>();
+ for (String property : requestBody.getRemovals()) {
+ changes.add(PropertyChange.removeProperty(property));
+ }
+ for (Map.Entry<String, String> entry :
requestBody.getUpdates().entrySet()) {
+ changes.add(PropertyChange.setProperty(entry.getKey(),
entry.getValue()));
+ }
+ catalog.alterDatabase(databaseName, changes, false);
+ AlterDatabaseResponse alterDatabaseResponse =
+ new AlterDatabaseResponse(
+ requestBody.getRemovals(),
+ requestBody.getUpdates().keySet().stream()
+ .collect(Collectors.toList()),
+ Collections.emptyList());
+ return mockResponse(alterDatabaseResponse, 200);
default:
return new MockResponse().setResponseCode(404);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index b0cf23202a..3ba1943d10 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -50,6 +50,7 @@ import java.util.Map;
import java.util.Optional;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
+import static
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -149,12 +150,14 @@ class RESTCatalogTest extends CatalogTestBase {
options.set(RESTCatalogOptions.TOKEN, initToken);
options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options));
-
- Optional<Snapshot> snapshot =
- catalog.loadSnapshot(Identifier.create("test_db_a",
"my_snapshot_table"));
+ Identifier hasSnapshotTable = Identifier.create("test_db_a",
"my_snapshot_table");
+ long id = 10086;
+ long millis = System.currentTimeMillis();
+ restCatalogServer.setTableSnapshot(hasSnapshotTable,
createSnapshotWithMillis(id, millis));
+ Optional<Snapshot> snapshot = catalog.loadSnapshot(hasSnapshotTable);
assertThat(snapshot).isPresent();
- assertThat(snapshot.get().id()).isEqualTo(10086);
- assertThat(snapshot.get().timeMillis()).isEqualTo(100);
+ assertThat(snapshot.get().id()).isEqualTo(id);
+ assertThat(snapshot.get().timeMillis()).isEqualTo(millis);
snapshot = catalog.loadSnapshot(Identifier.create("test_db_a",
"unknown"));
assertThat(snapshot).isEmpty();
@@ -175,6 +178,11 @@ class RESTCatalogTest extends CatalogTestBase {
return true;
}
+ @Override
+ protected boolean supportsAlterDatabase() {
+ return true;
+ }
+
private void createTable(
Identifier identifier, Map<String, String> options, List<String>
partitionKeys)
throws Exception {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
deleted file mode 100644
index a812965310..0000000000
--- a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.TableType;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.FileSystemCatalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.catalog.TableMetadata;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.Partition;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.view.View;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/** A catalog for testing RESTCatalog. */
-public class TestRESTCatalog extends FileSystemCatalog {
-
- public Map<String, TableSchema> tableFullName2Schema = new HashMap<String,
TableSchema>();
- public Map<String, List<Partition>> tableFullName2Partitions =
- new HashMap<String, List<Partition>>();
- public final Map<String, View> viewFullName2View = new HashMap<String,
View>();
-
- public TestRESTCatalog(FileIO fileIO, Path warehouse, Options options) {
- super(fileIO, warehouse, options);
- }
-
- public static TestRESTCatalog create(CatalogContext context) {
- String warehouse =
CatalogFactory.warehouse(context).toUri().toString();
-
- Path warehousePath = new Path(warehouse);
- FileIO fileIO;
-
- try {
- fileIO = FileIO.get(warehousePath, context);
- fileIO.checkOrMkdirs(warehousePath);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- return new TestRESTCatalog(fileIO, warehousePath, context.options());
- }
-
- @Override
- public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {
- getTable(identifier);
- tableFullName2Partitions.put(
- identifier.getFullName(),
- partitions.stream()
- .map(partition -> spec2Partition(partition))
- .collect(Collectors.toList()));
- }
-
- @Override
- public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {
- getTable(identifier);
- List<Partition> existPartitions =
tableFullName2Partitions.get(identifier.getFullName());
- partitions.forEach(
- partition -> {
- for (Map.Entry<String, String> entry :
partition.entrySet()) {
- existPartitions.stream()
- .filter(
- p ->
-
p.spec().containsKey(entry.getKey())
- && p.spec()
-
.get(entry.getKey())
-
.equals(entry.getValue()))
- .findFirst()
- .ifPresent(
- existPartition ->
existPartitions.remove(existPartition));
- }
- });
- }
-
- @Override
- public void alterPartitions(Identifier identifier, List<Partition>
partitions)
- throws TableNotExistException {
- getTable(identifier);
- List<Partition> existPartitions =
tableFullName2Partitions.get(identifier.getFullName());
- partitions.forEach(
- partition -> {
- for (Map.Entry<String, String> entry :
partition.spec().entrySet()) {
- existPartitions.stream()
- .filter(
- p ->
-
p.spec().containsKey(entry.getKey())
- && p.spec()
-
.get(entry.getKey())
-
.equals(entry.getValue()))
- .findFirst()
- .ifPresent(
- existPartition ->
existPartitions.remove(existPartition));
- }
- });
- existPartitions.addAll(partitions);
- tableFullName2Partitions.put(identifier.getFullName(),
existPartitions);
- }
-
- @Override
- public List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException {
- getTable(identifier);
- return tableFullName2Partitions.get(identifier.getFullName());
- }
-
- @Override
- public View getView(Identifier identifier) throws ViewNotExistException {
- if (viewFullName2View.containsKey(identifier.getFullName())) {
- return viewFullName2View.get(identifier.getFullName());
- }
- throw new ViewNotExistException(identifier);
- }
-
- @Override
- public void dropView(Identifier identifier, boolean ignoreIfNotExists)
- throws ViewNotExistException {
- if (viewFullName2View.containsKey(identifier.getFullName())) {
- viewFullName2View.remove(identifier.getFullName());
- }
- if (!ignoreIfNotExists) {
- throw new ViewNotExistException(identifier);
- }
- }
-
- @Override
- public void createView(Identifier identifier, View view, boolean
ignoreIfExists)
- throws ViewAlreadyExistException, DatabaseNotExistException {
- getDatabase(identifier.getDatabaseName());
- if (viewFullName2View.containsKey(identifier.getFullName()) &&
!ignoreIfExists) {
- throw new ViewAlreadyExistException(identifier);
- }
- viewFullName2View.put(identifier.getFullName(), view);
- }
-
- @Override
- public List<String> listViews(String databaseName) throws
DatabaseNotExistException {
- getDatabase(databaseName);
- return viewFullName2View.keySet().stream()
- .map(v -> Identifier.fromString(v))
- .filter(identifier ->
identifier.getDatabaseName().equals(databaseName))
- .map(identifier -> identifier.getTableName())
- .collect(Collectors.toList());
- }
-
- @Override
- public void renameView(Identifier fromView, Identifier toView, boolean
ignoreIfNotExists)
- throws ViewNotExistException, ViewAlreadyExistException {
- if (!viewFullName2View.containsKey(fromView.getFullName()) &&
!ignoreIfNotExists) {
- throw new ViewNotExistException(fromView);
- }
- if (viewFullName2View.containsKey(toView.getFullName())) {
- throw new ViewAlreadyExistException(toView);
- }
- if (viewFullName2View.containsKey(fromView.getFullName())) {
- View view = viewFullName2View.get(fromView.getFullName());
- viewFullName2View.remove(fromView.getFullName());
- viewFullName2View.put(toView.getFullName(), view);
- }
- }
-
- @Override
- protected List<String> listTablesImpl(String databaseName) {
- List<String> tables = super.listTablesImpl(databaseName);
- for (Map.Entry<String, TableSchema> entry :
tableFullName2Schema.entrySet()) {
- Identifier identifier = Identifier.fromString(entry.getKey());
- if (databaseName.equals(identifier.getDatabaseName())) {
- tables.add(identifier.getTableName());
- }
- }
- return tables;
- }
-
- @Override
- protected void dropTableImpl(Identifier identifier) {
- if (tableFullName2Schema.containsKey(identifier.getFullName())) {
- tableFullName2Schema.remove(identifier.getFullName());
- } else {
- super.dropTableImpl(identifier);
- }
- }
-
- @Override
- public void renameTableImpl(Identifier fromTable, Identifier toTable) {
- if (tableFullName2Schema.containsKey(fromTable.getFullName())) {
- TableSchema tableSchema =
tableFullName2Schema.get(fromTable.getFullName());
- tableFullName2Schema.remove(fromTable.getFullName());
- tableFullName2Schema.put(toTable.getFullName(), tableSchema);
- } else {
- super.renameTableImpl(fromTable, toTable);
- }
- }
-
- @Override
- protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
- throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- if (tableFullName2Schema.containsKey(identifier.getFullName())) {
- TableSchema schema =
tableFullName2Schema.get(identifier.getFullName());
- Options options = Options.fromMap(schema.options());
- if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) {
- throw new UnsupportedOperationException("Only data table
support alter table.");
- }
- } else {
- super.alterTableImpl(identifier, changes);
- }
- }
-
- @Override
- public void createFormatTable(Identifier identifier, Schema schema) {
- Map<String, String> options = new HashMap<>(schema.options());
- options.put("path", "/tmp/format_table");
- TableSchema tableSchema =
- new TableSchema(
- 1L,
- schema.fields(),
- 1,
- schema.partitionKeys(),
- schema.primaryKeys(),
- options,
- schema.comment());
- tableFullName2Schema.put(identifier.getFullName(), tableSchema);
- }
-
- @Override
- protected TableMetadata loadTableMetadata(Identifier identifier) throws
TableNotExistException {
- if (tableFullName2Schema.containsKey(identifier.getFullName())) {
- TableSchema tableSchema =
tableFullName2Schema.get(identifier.getFullName());
- return new TableMetadata(tableSchema, false, "uuid");
- }
- return super.loadTableMetadata(identifier);
- }
-
- private Partition spec2Partition(Map<String, String> spec) {
- return new Partition(spec, 123, 456, 789, 123);
- }
-}