This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 88be8648c9 [rest] Partition methods should check table first in 
RESTCatalog
88be8648c9 is described below

commit 88be8648c9973e5711ff7ffe540b87865c985dbd
Author: JingsongLi <[email protected]>
AuthorDate: Mon Dec 30 19:52:14 2024 +0800

    [rest] Partition methods should check table first in RESTCatalog
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |   3 +-
 .../java/org/apache/paimon/catalog/Catalog.java    |   4 +-
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 150 ++++++++++-----------
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  15 ++-
 4 files changed, 82 insertions(+), 90 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c63d92a144..02e662350f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -166,8 +166,7 @@ public abstract class AbstractCatalog implements Catalog {
         FileStoreTable table = (FileStoreTable) getTable(tableIdentifier);
 
         if (table.partitionKeys().isEmpty() || 
!table.coreOptions().partitionedTableInMetastore()) {
-            throw new UnsupportedOperationException(
-                    "The table is not partitioned table in metastore.");
+            return;
         }
 
         MetastoreClient.Factory metastoreFactory =
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 904c969107..0e1482c87b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -247,11 +247,11 @@ public interface Catalog extends AutoCloseable {
      * Drop the partition of the specify table.
      *
      * @param identifier path of the table to drop partition
-     * @param partitions the partition to be deleted
+     * @param partition the partition to be deleted
      * @throws TableNotExistException if the table does not exist
      * @throws PartitionNotExistException if the partition does not exist
      */
-    void dropPartition(Identifier identifier, Map<String, String> partitions)
+    void dropPartition(Identifier identifier, Map<String, String> partition)
             throws TableNotExistException, PartitionNotExistException;
 
     /**
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index c430e303b2..1a3d47cb26 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -31,7 +31,6 @@ import 
org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -65,7 +64,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.object.ObjectTable;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
@@ -85,9 +84,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.createCommitUser;
+import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
+import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
 import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
@@ -360,6 +359,12 @@ public class RESTCatalog implements Catalog {
     @Override
     public void createPartition(Identifier identifier, Map<String, String> 
partitionSpec)
             throws TableNotExistException {
+        Table table = getTable(identifier);
+        Options options = Options.fromMap(table.options());
+        if (!options.get(METASTORE_PARTITIONED_TABLE)) {
+            return;
+        }
+
         try {
             CreatePartitionRequest request = new 
CreatePartitionRequest(identifier, partitionSpec);
             client.post(
@@ -376,27 +381,77 @@ public class RESTCatalog implements Catalog {
     }
 
     @Override
-    public void dropPartition(Identifier identifier, Map<String, String> 
partitions)
+    public void dropPartition(Identifier identifier, Map<String, String> 
partition)
             throws TableNotExistException, PartitionNotExistException {
         checkNotSystemTable(identifier, "dropPartition");
-        dropPartitionMetadata(identifier, partitions);
+
         Table table = getTable(identifier);
-        cleanPartitionsInFileSystem(table, partitions);
+        Options options = Options.fromMap(table.options());
+        if (options.get(METASTORE_PARTITIONED_TABLE)) {
+            try {
+                client.delete(
+                        resourcePaths.partitions(
+                                identifier.getDatabaseName(), 
identifier.getTableName()),
+                        new DropPartitionRequest(partition),
+                        headers());
+            } catch (NoSuchResourceException ignore) {
+                throw new PartitionNotExistException(identifier, partition);
+            } catch (ForbiddenException e) {
+                throw new TableNoPermissionException(identifier, e);
+            }
+        }
+
+        try (BatchTableCommit commit =
+                
table.newBatchWriteBuilder().withOverwrite(partition).newCommit()) {
+            commit.commit(Collections.emptyList());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public List<PartitionEntry> listPartitions(Identifier identifier)
             throws TableNotExistException {
-        FileStoreTable table = (FileStoreTable) getTable(identifier);
-        boolean whetherSupportListPartitions =
-                Boolean.parseBoolean(
-                        
table.options().get(CoreOptions.METASTORE_PARTITIONED_TABLE.key()));
-        if (whetherSupportListPartitions) {
-            RowType rowType = table.schema().logicalPartitionType();
-            return listPartitionsFromServer(identifier, rowType);
-        } else {
-            return 
getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
+        Table table = getTable(identifier);
+        Options options = Options.fromMap(table.options());
+        if (!options.get(METASTORE_PARTITIONED_TABLE)) {
+            return table.newReadBuilder().newScan().listPartitionEntries();
         }
+
+        ListPartitionsResponse response;
+        try {
+            response =
+                    client.get(
+                            resourcePaths.partitions(
+                                    identifier.getDatabaseName(), 
identifier.getTableName()),
+                            ListPartitionsResponse.class,
+                            headers());
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+
+        if (response == null || response.getPartitions() == null) {
+            return Collections.emptyList();
+        }
+
+        RowType partitionType = table.rowType().project(table.partitionKeys());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(partitionType);
+        String defaultName = options.get(PARTITION_DEFAULT_NAME);
+        List<PartitionEntry> result = new ArrayList<>();
+        for (PartitionResponse partition : response.getPartitions()) {
+            GenericRow row =
+                    convertSpecToInternalRow(partition.getSpec(), 
partitionType, defaultName);
+            result.add(
+                    new PartitionEntry(
+                            serializer.toBinaryRow(row).copy(),
+                            partition.getRecordCount(),
+                            partition.getFileSizeInBytes(),
+                            partition.getFileCount(),
+                            partition.getLastFileCreationTime()));
+        }
+        return result;
     }
 
     @Override
@@ -444,41 +499,6 @@ public class RESTCatalog implements Catalog {
         return table;
     }
 
-    private List<PartitionEntry> listPartitionsFromServer(Identifier 
identifier, RowType rowType)
-            throws TableNotExistException {
-        try {
-            ListPartitionsResponse response =
-                    client.get(
-                            resourcePaths.partitions(
-                                    identifier.getDatabaseName(), 
identifier.getTableName()),
-                            ListPartitionsResponse.class,
-                            headers());
-            if (response != null && response.getPartitions() != null) {
-                return response.getPartitions().stream()
-                        .map(p -> convertToPartitionEntry(p, rowType))
-                        .collect(Collectors.toList());
-            } else {
-                return Collections.emptyList();
-            }
-        } catch (NoSuchResourceException e) {
-            throw new TableNotExistException(identifier);
-        } catch (ForbiddenException e) {
-            throw new TableNoPermissionException(identifier, e);
-        }
-    }
-
-    private void cleanPartitionsInFileSystem(Table table, Map<String, String> 
partitions) {
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        try (FileStoreCommit commit =
-                fileStoreTable
-                        .store()
-                        .newCommit(
-                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
-            commit.dropPartitions(
-                    Collections.singletonList(partitions), 
BatchWriteBuilder.COMMIT_IDENTIFIER);
-        }
-    }
-
     private GetTableResponse getTableResponse(Identifier identifier) throws 
TableNotExistException {
         try {
             return client.get(
@@ -492,23 +512,6 @@ public class RESTCatalog implements Catalog {
         }
     }
 
-    private boolean dropPartitionMetadata(Identifier identifier, Map<String, 
String> partitions)
-            throws TableNoPermissionException, PartitionNotExistException {
-        try {
-            DropPartitionRequest request = new 
DropPartitionRequest(partitions);
-            client.delete(
-                    resourcePaths.partitions(
-                            identifier.getDatabaseName(), 
identifier.getTableName()),
-                    request,
-                    headers());
-            return true;
-        } catch (NoSuchResourceException ignore) {
-            throw new PartitionNotExistException(identifier, partitions);
-        } catch (ForbiddenException e) {
-            throw new TableNoPermissionException(identifier, e);
-        }
-    }
-
     private static Map<String, String> configHeaders(Map<String, String> 
properties) {
         return RESTUtil.extractPrefixMap(properties, "header.");
     }
@@ -540,17 +543,6 @@ public class RESTCatalog implements Catalog {
         return refreshExecutor;
     }
 
-    private PartitionEntry convertToPartitionEntry(PartitionResponse 
partition, RowType rowType) {
-        InternalRowSerializer serializer = new InternalRowSerializer(rowType);
-        GenericRow row = convertSpecToInternalRow(partition.getSpec(), 
rowType, null);
-        return new PartitionEntry(
-                serializer.toBinaryRow(row).copy(),
-                partition.getRecordCount(),
-                partition.getFileSizeInBytes(),
-                partition.getFileCount(),
-                partition.getLastFileCreationTime());
-    }
-
     private static FileIO getFileIOFromOptions(CatalogContext context) {
         try {
             Options options = context.options();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 67103aaa52..c41c1d2a9c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -34,7 +34,6 @@ import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
 import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
-import org.apache.paimon.rest.responses.PartitionResponse;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
 
@@ -332,10 +331,12 @@ public class RESTCatalogTest {
     @Test
     public void testCreatePartition() throws Exception {
         String databaseName = MockRESTMessage.databaseName();
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
+
         Map<String, String> partitionSpec = new HashMap<>();
         partitionSpec.put("p1", "v1");
-        PartitionResponse response = MockRESTMessage.partitionResponse();
-        mockResponse(mapper.writeValueAsString(response), 200);
+        
mockResponse(mapper.writeValueAsString(MockRESTMessage.partitionResponse()), 
200);
         assertDoesNotThrow(
                 () ->
                         restCatalog.createPartition(
@@ -386,11 +387,12 @@ public class RESTCatalogTest {
     @Test
     public void testDropPartitionWhenPartitionNoExist() throws Exception {
         String databaseName = MockRESTMessage.databaseName();
+        GetTableResponse response = 
MockRESTMessage.getTableResponseEnablePartition();
+        mockResponse(mapper.writeValueAsString(response), 200);
+
         Map<String, String> partitionSpec = new HashMap<>();
-        GetTableResponse response = MockRESTMessage.getTableResponse();
         partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
         mockResponse(mapper.writeValueAsString(""), 404);
-        mockResponse(mapper.writeValueAsString(response), 200);
         assertThrows(
                 Catalog.PartitionNotExistException.class,
                 () ->
@@ -418,7 +420,6 @@ public class RESTCatalogTest {
         Map<String, String> partitionSpec = new HashMap<>();
         GetTableResponse response = MockRESTMessage.getTableResponse();
         partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
-        mockResponse(mapper.writeValueAsString(""), 200);
         mockResponse("", 404);
         assertThrows(
                 Catalog.TableNotExistException.class,
@@ -442,7 +443,7 @@ public class RESTCatalogTest {
     @Test
     public void testListPartitionsFromFile() throws Exception {
         String databaseName = MockRESTMessage.databaseName();
-        GetTableResponse response = MockRESTMessage.getTableResponse();
+        GetTableResponse response = 
MockRESTMessage.getTableResponseEnablePartition();
         mockResponse(mapper.writeValueAsString(response), 200);
         mockResponse(mapper.writeValueAsString(response), 200);
         List<PartitionEntry> partitionEntries =

Reply via email to