This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 88be8648c9 [rest] Partition methods should check table first in
RESTCatalog
88be8648c9 is described below
commit 88be8648c9973e5711ff7ffe540b87865c985dbd
Author: JingsongLi <[email protected]>
AuthorDate: Mon Dec 30 19:52:14 2024 +0800
[rest] Partition methods should check table first in RESTCatalog
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 3 +-
.../java/org/apache/paimon/catalog/Catalog.java | 4 +-
.../java/org/apache/paimon/rest/RESTCatalog.java | 150 ++++++++++-----------
.../org/apache/paimon/rest/RESTCatalogTest.java | 15 ++-
4 files changed, 82 insertions(+), 90 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c63d92a144..02e662350f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -166,8 +166,7 @@ public abstract class AbstractCatalog implements Catalog {
FileStoreTable table = (FileStoreTable) getTable(tableIdentifier);
if (table.partitionKeys().isEmpty() ||
!table.coreOptions().partitionedTableInMetastore()) {
- throw new UnsupportedOperationException(
- "The table is not partitioned table in metastore.");
+ return;
}
MetastoreClient.Factory metastoreFactory =
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 904c969107..0e1482c87b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -247,11 +247,11 @@ public interface Catalog extends AutoCloseable {
* Drop the partition of the specify table.
*
* @param identifier path of the table to drop partition
- * @param partitions the partition to be deleted
+ * @param partition the partition to be deleted
* @throws TableNotExistException if the table does not exist
* @throws PartitionNotExistException if the partition does not exist
*/
- void dropPartition(Identifier identifier, Map<String, String> partitions)
+ void dropPartition(Identifier identifier, Map<String, String> partition)
throws TableNotExistException, PartitionNotExistException;
/**
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index c430e303b2..1a3d47cb26 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -31,7 +31,6 @@ import
org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -65,7 +64,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
@@ -85,9 +84,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.stream.Collectors;
-import static org.apache.paimon.CoreOptions.createCommitUser;
+import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
+import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
@@ -360,6 +359,12 @@ public class RESTCatalog implements Catalog {
@Override
public void createPartition(Identifier identifier, Map<String, String>
partitionSpec)
throws TableNotExistException {
+ Table table = getTable(identifier);
+ Options options = Options.fromMap(table.options());
+ if (!options.get(METASTORE_PARTITIONED_TABLE)) {
+ return;
+ }
+
try {
CreatePartitionRequest request = new
CreatePartitionRequest(identifier, partitionSpec);
client.post(
@@ -376,27 +381,77 @@ public class RESTCatalog implements Catalog {
}
@Override
- public void dropPartition(Identifier identifier, Map<String, String>
partitions)
+ public void dropPartition(Identifier identifier, Map<String, String>
partition)
throws TableNotExistException, PartitionNotExistException {
checkNotSystemTable(identifier, "dropPartition");
- dropPartitionMetadata(identifier, partitions);
+
Table table = getTable(identifier);
- cleanPartitionsInFileSystem(table, partitions);
+ Options options = Options.fromMap(table.options());
+ if (options.get(METASTORE_PARTITIONED_TABLE)) {
+ try {
+ client.delete(
+ resourcePaths.partitions(
+ identifier.getDatabaseName(),
identifier.getTableName()),
+ new DropPartitionRequest(partition),
+ headers());
+ } catch (NoSuchResourceException ignore) {
+ throw new PartitionNotExistException(identifier, partition);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+ }
+
+ try (BatchTableCommit commit =
+
table.newBatchWriteBuilder().withOverwrite(partition).newCommit()) {
+ commit.commit(Collections.emptyList());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public List<PartitionEntry> listPartitions(Identifier identifier)
throws TableNotExistException {
- FileStoreTable table = (FileStoreTable) getTable(identifier);
- boolean whetherSupportListPartitions =
- Boolean.parseBoolean(
-
table.options().get(CoreOptions.METASTORE_PARTITIONED_TABLE.key()));
- if (whetherSupportListPartitions) {
- RowType rowType = table.schema().logicalPartitionType();
- return listPartitionsFromServer(identifier, rowType);
- } else {
- return
getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
+ Table table = getTable(identifier);
+ Options options = Options.fromMap(table.options());
+ if (!options.get(METASTORE_PARTITIONED_TABLE)) {
+ return table.newReadBuilder().newScan().listPartitionEntries();
}
+
+ ListPartitionsResponse response;
+ try {
+ response =
+ client.get(
+ resourcePaths.partitions(
+ identifier.getDatabaseName(),
identifier.getTableName()),
+ ListPartitionsResponse.class,
+ headers());
+ } catch (NoSuchResourceException e) {
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+
+ if (response == null || response.getPartitions() == null) {
+ return Collections.emptyList();
+ }
+
+ RowType partitionType = table.rowType().project(table.partitionKeys());
+ InternalRowSerializer serializer = new
InternalRowSerializer(partitionType);
+ String defaultName = options.get(PARTITION_DEFAULT_NAME);
+ List<PartitionEntry> result = new ArrayList<>();
+ for (PartitionResponse partition : response.getPartitions()) {
+ GenericRow row =
+ convertSpecToInternalRow(partition.getSpec(),
partitionType, defaultName);
+ result.add(
+ new PartitionEntry(
+ serializer.toBinaryRow(row).copy(),
+ partition.getRecordCount(),
+ partition.getFileSizeInBytes(),
+ partition.getFileCount(),
+ partition.getLastFileCreationTime()));
+ }
+ return result;
}
@Override
@@ -444,41 +499,6 @@ public class RESTCatalog implements Catalog {
return table;
}
- private List<PartitionEntry> listPartitionsFromServer(Identifier
identifier, RowType rowType)
- throws TableNotExistException {
- try {
- ListPartitionsResponse response =
- client.get(
- resourcePaths.partitions(
- identifier.getDatabaseName(),
identifier.getTableName()),
- ListPartitionsResponse.class,
- headers());
- if (response != null && response.getPartitions() != null) {
- return response.getPartitions().stream()
- .map(p -> convertToPartitionEntry(p, rowType))
- .collect(Collectors.toList());
- } else {
- return Collections.emptyList();
- }
- } catch (NoSuchResourceException e) {
- throw new TableNotExistException(identifier);
- } catch (ForbiddenException e) {
- throw new TableNoPermissionException(identifier, e);
- }
- }
-
- private void cleanPartitionsInFileSystem(Table table, Map<String, String>
partitions) {
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- try (FileStoreCommit commit =
- fileStoreTable
- .store()
- .newCommit(
-
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
- commit.dropPartitions(
- Collections.singletonList(partitions),
BatchWriteBuilder.COMMIT_IDENTIFIER);
- }
- }
-
private GetTableResponse getTableResponse(Identifier identifier) throws
TableNotExistException {
try {
return client.get(
@@ -492,23 +512,6 @@ public class RESTCatalog implements Catalog {
}
}
- private boolean dropPartitionMetadata(Identifier identifier, Map<String,
String> partitions)
- throws TableNoPermissionException, PartitionNotExistException {
- try {
- DropPartitionRequest request = new
DropPartitionRequest(partitions);
- client.delete(
- resourcePaths.partitions(
- identifier.getDatabaseName(),
identifier.getTableName()),
- request,
- headers());
- return true;
- } catch (NoSuchResourceException ignore) {
- throw new PartitionNotExistException(identifier, partitions);
- } catch (ForbiddenException e) {
- throw new TableNoPermissionException(identifier, e);
- }
- }
-
private static Map<String, String> configHeaders(Map<String, String>
properties) {
return RESTUtil.extractPrefixMap(properties, "header.");
}
@@ -540,17 +543,6 @@ public class RESTCatalog implements Catalog {
return refreshExecutor;
}
- private PartitionEntry convertToPartitionEntry(PartitionResponse
partition, RowType rowType) {
- InternalRowSerializer serializer = new InternalRowSerializer(rowType);
- GenericRow row = convertSpecToInternalRow(partition.getSpec(),
rowType, null);
- return new PartitionEntry(
- serializer.toBinaryRow(row).copy(),
- partition.getRecordCount(),
- partition.getFileSizeInBytes(),
- partition.getFileCount(),
- partition.getLastFileCreationTime());
- }
-
private static FileIO getFileIOFromOptions(CatalogContext context) {
try {
Options options = context.options();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 67103aaa52..c41c1d2a9c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -34,7 +34,6 @@ import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
-import org.apache.paimon.rest.responses.PartitionResponse;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
@@ -332,10 +331,12 @@ public class RESTCatalogTest {
@Test
public void testCreatePartition() throws Exception {
String databaseName = MockRESTMessage.databaseName();
+ GetTableResponse response = MockRESTMessage.getTableResponse();
+ mockResponse(mapper.writeValueAsString(response), 200);
+
Map<String, String> partitionSpec = new HashMap<>();
partitionSpec.put("p1", "v1");
- PartitionResponse response = MockRESTMessage.partitionResponse();
- mockResponse(mapper.writeValueAsString(response), 200);
+
mockResponse(mapper.writeValueAsString(MockRESTMessage.partitionResponse()),
200);
assertDoesNotThrow(
() ->
restCatalog.createPartition(
@@ -386,11 +387,12 @@ public class RESTCatalogTest {
@Test
public void testDropPartitionWhenPartitionNoExist() throws Exception {
String databaseName = MockRESTMessage.databaseName();
+ GetTableResponse response =
MockRESTMessage.getTableResponseEnablePartition();
+ mockResponse(mapper.writeValueAsString(response), 200);
+
Map<String, String> partitionSpec = new HashMap<>();
- GetTableResponse response = MockRESTMessage.getTableResponse();
partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
mockResponse(mapper.writeValueAsString(""), 404);
- mockResponse(mapper.writeValueAsString(response), 200);
assertThrows(
Catalog.PartitionNotExistException.class,
() ->
@@ -418,7 +420,6 @@ public class RESTCatalogTest {
Map<String, String> partitionSpec = new HashMap<>();
GetTableResponse response = MockRESTMessage.getTableResponse();
partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
- mockResponse(mapper.writeValueAsString(""), 200);
mockResponse("", 404);
assertThrows(
Catalog.TableNotExistException.class,
@@ -442,7 +443,7 @@ public class RESTCatalogTest {
@Test
public void testListPartitionsFromFile() throws Exception {
String databaseName = MockRESTMessage.databaseName();
- GetTableResponse response = MockRESTMessage.getTableResponse();
+ GetTableResponse response =
MockRESTMessage.getTableResponseEnablePartition();
mockResponse(mapper.writeValueAsString(response), 200);
mockResponse(mapper.writeValueAsString(response), 200);
List<PartitionEntry> partitionEntries =