This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 035c89c020 [core] Introduce batch partition methods in catalog (#4866)
035c89c020 is described below
commit 035c89c020654b7915e023752e6dd9df2fa9141c
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 8 21:52:29 2025 +0800
[core] Introduce batch partition methods in catalog (#4866)
---
.../java/org/apache/paimon/AbstractFileStore.java | 6 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 40 ++--
.../org/apache/paimon/catalog/CachingCatalog.java | 15 +-
.../java/org/apache/paimon/catalog/Catalog.java | 111 +++++------
.../org/apache/paimon/catalog/CatalogUtils.java | 2 +-
.../org/apache/paimon/catalog/DelegateCatalog.java | 36 ++--
.../paimon/operation/AbstractFileStoreScan.java | 6 +
.../org/apache/paimon/operation/FileStoreScan.java | 2 +
.../apache/paimon/operation/ManifestsReader.java | 11 ++
.../apache/paimon/privilege/PrivilegedCatalog.java | 28 ++-
.../java/org/apache/paimon/rest/RESTCatalog.java | 64 ++-----
.../paimon/table/FallbackReadFileStoreTable.java | 8 +
.../paimon/table/source/AbstractDataTableScan.java | 6 +
.../apache/paimon/table/source/InnerTableScan.java | 4 +
.../table/source/snapshot/SnapshotReader.java | 2 +
.../table/source/snapshot/SnapshotReaderImpl.java | 6 +
.../apache/paimon/table/system/AuditLogTable.java | 12 ++
.../org/apache/paimon/rest/RESTCatalogTest.java | 102 ----------
.../java/org/apache/paimon/flink/FlinkCatalog.java | 8 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 209 +++++++++++++++++++--
.../org/apache/paimon/hive/HiveCatalogTest.java | 55 ++++++
21 files changed, 457 insertions(+), 276 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 953096c0b5..22987c6292 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -232,7 +232,11 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
}
protected ManifestsReader newManifestsReader(boolean forWrite) {
- return new ManifestsReader(partitionType, snapshotManager(),
manifestListFactory(forWrite));
+ return new ManifestsReader(
+ partitionType,
+ options.partitionDefaultName(),
+ snapshotManager(),
+ manifestListFactory(forWrite));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 439f456efb..a6790004a1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -50,10 +50,8 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -160,32 +158,11 @@ public abstract class AbstractCatalog implements Catalog {
protected abstract Database getDatabaseImpl(String name) throws
DatabaseNotExistException;
@Override
- public void createPartition(Identifier identifier, Map<String, String>
partitionSpec)
- throws TableNotExistException {
- Identifier tableIdentifier =
- Identifier.create(identifier.getDatabaseName(),
identifier.getTableName());
- FileStoreTable table = (FileStoreTable) getTable(tableIdentifier);
-
- if (table.partitionKeys().isEmpty() ||
!table.coreOptions().partitionedTableInMetastore()) {
- return;
- }
-
- MetastoreClient.Factory metastoreFactory =
- table.catalogEnvironment().metastoreClientFactory();
- if (metastoreFactory == null) {
- throw new UnsupportedOperationException(
- "The catalog must have metastore to create partition.");
- }
-
- try (MetastoreClient client = metastoreFactory.create()) {
- client.addPartition(new LinkedHashMap<>(partitionSpec));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {}
@Override
- public void dropPartition(Identifier identifier, Map<String, String>
partitionSpec)
+ public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
throws TableNotExistException {
checkNotSystemTable(identifier, "dropPartition");
Table table = getTable(identifier);
@@ -195,11 +172,18 @@ public abstract class AbstractCatalog implements Catalog {
.store()
.newCommit(
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
- commit.dropPartitions(
- Collections.singletonList(partitionSpec),
BatchWriteBuilder.COMMIT_IDENTIFIER);
+ commit.dropPartitions(partitions,
BatchWriteBuilder.COMMIT_IDENTIFIER);
}
}
+ @Override
+ public void alterPartitions(Identifier identifier, List<Partition>
partitions)
+ throws TableNotExistException {}
+
+ @Override
+ public void markDonePartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {}
+
@Override
public List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException {
return listPartitionsFromFileSystem(getTable(identifier));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 4796276972..23408e5692 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -295,9 +295,18 @@ public class CachingCatalog extends DelegateCatalog {
}
@Override
- public void dropPartition(Identifier identifier, Map<String, String>
partitions)
- throws TableNotExistException, PartitionNotExistException {
- wrapped.dropPartition(identifier, partitions);
+ public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ wrapped.dropPartitions(identifier, partitions);
+ if (partitionCache != null) {
+ partitionCache.invalidate(identifier);
+ }
+ }
+
+ @Override
+ public void alterPartitions(Identifier identifier, List<Partition>
partitions)
+ throws TableNotExistException {
+ wrapped.alterPartitions(identifier, partitions);
if (partitionCache != null) {
partitionCache.invalidate(identifier);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index e90d3c79c5..e7d07d6dc4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -73,6 +73,8 @@ public interface Catalog extends AutoCloseable {
/** Return a boolean that indicates whether this catalog is
case-sensitive. */
boolean caseSensitive();
+ // ======================= database methods ===============================
+
/**
* Get the names of all databases in this catalog.
*
@@ -139,6 +141,8 @@ public interface Catalog extends AutoCloseable {
void alterDatabase(String name, List<PropertyChange> changes, boolean
ignoreIfNotExists)
throws DatabaseNotExistException;
+ // ======================= table methods ===============================
+
/**
* Return a {@link Table} identified by the given {@link Identifier}.
*
@@ -231,52 +235,81 @@ public interface Catalog extends AutoCloseable {
default void invalidateTable(Identifier identifier) {}
/**
- * Create the partition of the specify table.
+ * Modify an existing table from a {@link SchemaChange}.
+ *
+ * <p>NOTE: System tables can not be altered.
+ *
+ * @param identifier path of the table to be modified
+ * @param change the schema change
+ * @param ignoreIfNotExists flag to specify behavior when the table does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws TableNotExistException if the table does not exist
+ */
+ default void alterTable(Identifier identifier, SchemaChange change,
boolean ignoreIfNotExists)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ alterTable(identifier, Collections.singletonList(change),
ignoreIfNotExists);
+ }
+
+ // ======================= partition methods
===============================
+
+ /**
+ * Create partitions of the specify table.
*
* <p>Only catalog with metastore can support this method, and only table
with
* 'metastore.partitioned-table' can support this method.
*
- * @param identifier path of the table to drop partition
- * @param partitionSpec the partition to be created
+ * @param identifier path of the table to create partitions
+ * @param partitions partitions to be created
* @throws TableNotExistException if the table does not exist
*/
- void createPartition(Identifier identifier, Map<String, String>
partitionSpec)
+ void createPartitions(Identifier identifier, List<Map<String, String>>
partitions)
throws TableNotExistException;
/**
- * Drop the partition of the specify table.
+ * Drop partitions of the specify table.
*
- * @param identifier path of the table to drop partition
- * @param partition the partition to be deleted
+ * @param identifier path of the table to drop partitions
+ * @param partitions partitions to be deleted
* @throws TableNotExistException if the table does not exist
- * @throws PartitionNotExistException if the partition does not exist
*/
- void dropPartition(Identifier identifier, Map<String, String> partition)
- throws TableNotExistException, PartitionNotExistException;
+ void dropPartitions(Identifier identifier, List<Map<String, String>>
partitions)
+ throws TableNotExistException;
/**
- * Get Partition of all partitions of the table.
+ * Alter partitions of the specify table.
*
- * @param identifier path of the table to list partitions
+ * <p>Only catalog with metastore can support this method, and only table
with
+ * 'metastore.partitioned-table' can support this method.
+ *
+ * @param identifier path of the table to alter partitions
+ * @param partitions partitions to be altered
* @throws TableNotExistException if the table does not exist
*/
- List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException;
+ void alterPartitions(Identifier identifier, List<Partition> partitions)
+ throws TableNotExistException;
/**
- * Modify an existing table from a {@link SchemaChange}.
+ * Mark partitions done of the specify table.
*
- * <p>NOTE: System tables can not be altered.
+ * <p>Only catalog with metastore can support this method, and only table
with
+ * 'metastore.partitioned-table' can support this method.
*
- * @param identifier path of the table to be modified
- * @param change the schema change
- * @param ignoreIfNotExists flag to specify behavior when the table does
not exist: if set to
- * false, throw an exception, if set to true, do nothing.
+ * @param identifier path of the table to mark done partitions
+ * @param partitions partitions to be marked done
* @throws TableNotExistException if the table does not exist
*/
- default void alterTable(Identifier identifier, SchemaChange change,
boolean ignoreIfNotExists)
- throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- alterTable(identifier, Collections.singletonList(change),
ignoreIfNotExists);
- }
+ void markDonePartitions(Identifier identifier, List<Map<String, String>>
partitions)
+ throws TableNotExistException;
+
+ /**
+ * Get Partition of all partitions of the table.
+ *
+ * @param identifier path of the table to list partitions
+ * @throws TableNotExistException if the table does not exist
+ */
+ List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException;
+
+ // ======================= view methods ===============================
/**
* Return a {@link View} identified by the given {@link Identifier}.
@@ -340,6 +373,8 @@ public interface Catalog extends AutoCloseable {
throw new UnsupportedOperationException();
}
+ // ======================= repair methods ===============================
+
/**
* Repair the entire Catalog, repair the metadata in the metastore
consistent with the metadata
* in the filesystem, register missing tables in the metastore.
@@ -508,36 +543,6 @@ public interface Catalog extends AutoCloseable {
}
}
- /** Exception for trying to operate on a partition that doesn't exist. */
- class PartitionNotExistException extends Exception {
-
- private static final String MSG = "Partition %s do not exist in the
table %s.";
-
- private final Identifier identifier;
-
- private final Map<String, String> partitionSpec;
-
- public PartitionNotExistException(
- Identifier identifier, Map<String, String> partitionSpec) {
- this(identifier, partitionSpec, null);
- }
-
- public PartitionNotExistException(
- Identifier identifier, Map<String, String> partitionSpec,
Throwable cause) {
- super(String.format(MSG, partitionSpec, identifier.getFullName()),
cause);
- this.identifier = identifier;
- this.partitionSpec = partitionSpec;
- }
-
- public Identifier identifier() {
- return identifier;
- }
-
- public Map<String, String> partitionSpec() {
- return partitionSpec;
- }
- }
-
/** Exception for trying to alter a column that already exists. */
class ColumnAlreadyExistException extends Exception {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index cddb76e683..fabfa50fc4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -131,7 +131,7 @@ public class CatalogUtils {
InternalRowPartitionComputer computer =
new InternalRowPartitionComputer(
options.get(PARTITION_DEFAULT_NAME),
- table.rowType(),
+ table.rowType().project(table.partitionKeys()),
table.partitionKeys().toArray(new String[0]),
options.get(PARTITION_GENERATE_LEGCY_NAME));
List<PartitionEntry> partitionEntries =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index e2d1a94cfa..23c50e9986 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -119,6 +119,30 @@ public class DelegateCatalog implements Catalog {
wrapped.alterTable(identifier, changes, ignoreIfNotExists);
}
+ @Override
+ public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ wrapped.createPartitions(identifier, partitions);
+ }
+
+ @Override
+ public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ wrapped.dropPartitions(identifier, partitions);
+ }
+
+ @Override
+ public void alterPartitions(Identifier identifier, List<Partition>
partitions)
+ throws TableNotExistException {
+ wrapped.alterPartitions(identifier, partitions);
+ }
+
+ @Override
+ public void markDonePartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ wrapped.markDonePartitions(identifier, partitions);
+ }
+
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
return wrapped.getTable(identifier);
@@ -152,18 +176,6 @@ public class DelegateCatalog implements Catalog {
wrapped.renameView(fromView, toView, ignoreIfNotExists);
}
- @Override
- public void createPartition(Identifier identifier, Map<String, String>
partitions)
- throws TableNotExistException {
- wrapped.createPartition(identifier, partitions);
- }
-
- @Override
- public void dropPartition(Identifier identifier, Map<String, String>
partitions)
- throws TableNotExistException, PartitionNotExistException {
- wrapped.dropPartition(identifier, partitions);
- }
-
@Override
public List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException {
return wrapped.listPartitions(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 4f8a1f3264..27ba4703b9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -118,6 +118,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withPartitionsFilter(List<Map<String, String>>
partitions) {
+ manifestsReader.withPartitionsFilter(partitions);
+ return this;
+ }
+
@Override
public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
manifestsReader.withPartitionFilter(predicate);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 179d16de6c..99ae3ef47d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -53,6 +53,8 @@ public interface FileStoreScan {
FileStoreScan withPartitionFilter(List<BinaryRow> partitions);
+ FileStoreScan withPartitionsFilter(List<Map<String, String>> partitions);
+
FileStoreScan withPartitionFilter(PartitionPredicate predicate);
FileStoreScan withBucket(int bucket);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index 5ee468af20..2eaa3646f7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -34,13 +34,17 @@ import javax.annotation.concurrent.ThreadSafe;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
+
/** A util class to read manifest files. */
@ThreadSafe
public class ManifestsReader {
private final RowType partitionType;
+ private final String partitionDefaultValue;
private final SnapshotManager snapshotManager;
private final ManifestList.Factory manifestListFactory;
@@ -48,9 +52,11 @@ public class ManifestsReader {
public ManifestsReader(
RowType partitionType,
+ String partitionDefaultValue,
SnapshotManager snapshotManager,
ManifestList.Factory manifestListFactory) {
this.partitionType = partitionType;
+ this.partitionDefaultValue = partitionDefaultValue;
this.snapshotManager = snapshotManager;
this.manifestListFactory = manifestListFactory;
}
@@ -65,6 +71,11 @@ public class ManifestsReader {
return this;
}
+ public ManifestsReader withPartitionsFilter(List<Map<String, String>>
partitions) {
+ return withPartitionFilter(
+ createBinaryPartitions(partitions, partitionType,
partitionDefaultValue));
+ }
+
public ManifestsReader withPartitionFilter(PartitionPredicate predicate) {
this.partitionFilter = predicate;
return this;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index 35822471a2..6be09fa9b9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
@@ -143,10 +144,31 @@ public class PrivilegedCatalog extends DelegateCatalog {
}
@Override
- public void dropPartition(Identifier identifier, Map<String, String>
partitions)
- throws TableNotExistException, PartitionNotExistException {
+ public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
+ wrapped.createPartitions(identifier, partitions);
+ }
+
+ @Override
+ public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
+ wrapped.dropPartitions(identifier, partitions);
+ }
+
+ @Override
+ public void alterPartitions(Identifier identifier, List<Partition>
partitions)
+ throws TableNotExistException {
+ privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
+ wrapped.alterPartitions(identifier, partitions);
+ }
+
+ @Override
+ public void markDonePartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
- wrapped.dropPartition(identifier, partitions);
+ wrapped.markDonePartitions(identifier, partitions);
}
public void createPrivilegedUser(String user, String password) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index c547656e7c..8659fbf655 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -41,9 +41,7 @@ import
org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
-import org.apache.paimon.rest.requests.CreatePartitionRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
-import org.apache.paimon.rest.requests.DropPartitionRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.ConfigResponse;
@@ -53,7 +51,6 @@ import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
-import org.apache.paimon.rest.responses.PartitionResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
@@ -62,7 +59,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
-import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
@@ -84,7 +80,6 @@ import java.util.concurrent.ScheduledExecutorService;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
-import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
@@ -362,56 +357,27 @@ public class RESTCatalog implements Catalog {
}
@Override
- public void createPartition(Identifier identifier, Map<String, String>
partitionSpec)
+ public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
throws TableNotExistException {
- Table table = getTable(identifier);
- Options options = Options.fromMap(table.options());
- if (!options.get(METASTORE_PARTITIONED_TABLE)) {
- return;
- }
-
- try {
- CreatePartitionRequest request = new
CreatePartitionRequest(identifier, partitionSpec);
- client.post(
- resourcePaths.partitions(
- identifier.getDatabaseName(),
identifier.getTableName()),
- request,
- PartitionResponse.class,
- headers());
- } catch (NoSuchResourceException e) {
- throw new TableNotExistException(identifier);
- } catch (ForbiddenException e) {
- throw new TableNoPermissionException(identifier, e);
- }
+ throw new UnsupportedOperationException();
}
@Override
- public void dropPartition(Identifier identifier, Map<String, String>
partition)
- throws TableNotExistException, PartitionNotExistException {
- checkNotSystemTable(identifier, "dropPartition");
+ public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ throw new UnsupportedOperationException();
+ }
- Table table = getTable(identifier);
- Options options = Options.fromMap(table.options());
- if (options.get(METASTORE_PARTITIONED_TABLE)) {
- try {
- client.delete(
- resourcePaths.partitions(
- identifier.getDatabaseName(),
identifier.getTableName()),
- new DropPartitionRequest(partition),
- headers());
- } catch (NoSuchResourceException ignore) {
- throw new PartitionNotExistException(identifier, partition);
- } catch (ForbiddenException e) {
- throw new TableNoPermissionException(identifier, e);
- }
- }
+ @Override
+ public void alterPartitions(Identifier identifier, List<Partition>
partitions)
+ throws TableNotExistException {
+ throw new UnsupportedOperationException();
+ }
- try (BatchTableCommit commit =
-
table.newBatchWriteBuilder().withOverwrite(partition).newCommit()) {
- commit.commit(Collections.emptyList());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ @Override
+ public void markDonePartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ throw new UnsupportedOperationException();
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index e3e290f060..eb405a5d5d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
@@ -258,6 +259,13 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return this;
}
+ @Override
+ public InnerTableScan withPartitionsFilter(List<Map<String, String>>
partitions) {
+ mainScan.withPartitionsFilter(partitions);
+ fallbackScan.withPartitionsFilter(partitions);
+ return this;
+ }
+
@Override
public Scan withBucketFilter(Filter<Integer> bucketFilter) {
mainScan.withBucketFilter(bucketFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index a5810bfc24..59b11281cc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -101,6 +101,12 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
return this;
}
+ @Override
+ public AbstractDataTableScan withPartitionsFilter(List<Map<String,
String>> partitions) {
+ snapshotReader.withPartitionsFilter(partitions);
+ return this;
+ }
+
@Override
public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
snapshotReader.withLevelFilter(levelFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index c2425ff16f..f7d609187d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -39,6 +39,10 @@ public interface InnerTableScan extends TableScan {
return this;
}
+ default InnerTableScan withPartitionsFilter(List<Map<String, String>>
partitions) {
+ return this;
+ }
+
default InnerTableScan withPartitionFilter(List<BinaryRow> partitions) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index f3e0a92b8f..3329ab95fc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -73,6 +73,8 @@ public interface SnapshotReader {
SnapshotReader withPartitionFilter(List<BinaryRow> partitions);
+ SnapshotReader withPartitionsFilter(List<Map<String, String>> partitions);
+
SnapshotReader withMode(ScanMode scanMode);
SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 43a6d3c872..032738c38c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -192,6 +192,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withPartitionsFilter(List<Map<String, String>>
partitions) {
+ scan.withPartitionsFilter(partitions);
+ return this;
+ }
+
@Override
public SnapshotReader withFilter(Predicate predicate) {
List<String> partitionKeys = tableSchema.partitionKeys();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 1cb967f8d1..005535094e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -313,6 +313,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withPartitionsFilter(List<Map<String, String>>
partitions) {
+ wrapped.withPartitionsFilter(partitions);
+ return this;
+ }
+
@Override
public SnapshotReader withMode(ScanMode scanMode) {
wrapped.withMode(scanMode);
@@ -446,6 +452,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public InnerTableScan withPartitionsFilter(List<Map<String, String>>
partitions) {
+ batchScan.withPartitionsFilter(partitions);
+ return this;
+ }
+
@Override
public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
batchScan.withBucketFilter(bucketFilter);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 627b02c1e3..344807b4c9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -50,9 +50,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
@@ -317,106 +315,6 @@ public class RESTCatalogTest {
() -> restCatalog.dropTable(Identifier.create(databaseName,
tableName), false));
}
- @Test
- public void testCreatePartition() throws Exception {
- String databaseName = MockRESTMessage.databaseName();
- GetTableResponse response = MockRESTMessage.getTableResponse();
- mockResponse(mapper.writeValueAsString(response), 200);
-
- Map<String, String> partitionSpec = new HashMap<>();
- partitionSpec.put("p1", "v1");
-
mockResponse(mapper.writeValueAsString(MockRESTMessage.partitionResponse()),
200);
- assertDoesNotThrow(
- () ->
- restCatalog.createPartition(
- Identifier.create(databaseName, "table"),
partitionSpec));
- }
-
- @Test
- public void testCreatePartitionWhenTableNotExist() throws Exception {
- String databaseName = MockRESTMessage.databaseName();
- Map<String, String> partitionSpec = new HashMap<>();
- partitionSpec.put("p1", "v1");
- mockResponse("", 404);
- assertThrows(
- Catalog.TableNotExistException.class,
- () ->
- restCatalog.createPartition(
- Identifier.create(databaseName, "table"),
partitionSpec));
- }
-
- @Test
- public void testCreatePartitionWhenTableNoPermissionException() throws
Exception {
- String databaseName = MockRESTMessage.databaseName();
- Map<String, String> partitionSpec = new HashMap<>();
- partitionSpec.put("p1", "v1");
- mockResponse("", 403);
- assertThrows(
- Catalog.TableNoPermissionException.class,
- () ->
- restCatalog.createPartition(
- Identifier.create(databaseName, "table"),
partitionSpec));
- }
-
- @Test
- public void testDropPartition() throws Exception {
- String databaseName = MockRESTMessage.databaseName();
- Map<String, String> partitionSpec = new HashMap<>();
- GetTableResponse response = MockRESTMessage.getTableResponse();
- partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
- mockResponse(mapper.writeValueAsString(""), 200);
- mockResponse(mapper.writeValueAsString(response), 200);
- assertThrows(
- RuntimeException.class,
- () ->
- restCatalog.dropPartition(
- Identifier.create(databaseName, "table"),
partitionSpec));
- }
-
- @Test
- public void testDropPartitionWhenPartitionNoExist() throws Exception {
- String databaseName = MockRESTMessage.databaseName();
- GetTableResponse response =
MockRESTMessage.getTableResponseEnablePartition();
- mockResponse(mapper.writeValueAsString(response), 200);
-
- Map<String, String> partitionSpec = new HashMap<>();
- partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
- mockResponse(mapper.writeValueAsString(""), 404);
- assertThrows(
- Catalog.PartitionNotExistException.class,
- () ->
- restCatalog.dropPartition(
- Identifier.create(databaseName, "table"),
partitionSpec));
- }
-
- @Test
- public void testDropPartitionWhenTableNoPermission() throws Exception {
- String databaseName = MockRESTMessage.databaseName();
- Map<String, String> partitionSpec = new HashMap<>();
- GetTableResponse response = MockRESTMessage.getTableResponse();
- partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
- mockResponse(mapper.writeValueAsString(""), 403);
- assertThrows(
- Catalog.TableNoPermissionException.class,
- () ->
- restCatalog.dropPartition(
- Identifier.create(databaseName, "table"),
partitionSpec));
- }
-
- @Test
- public void testDropPartitionWhenTableNoExist() throws Exception {
- String databaseName = MockRESTMessage.databaseName();
- Map<String, String> partitionSpec = new HashMap<>();
- GetTableResponse response = MockRESTMessage.getTableResponse();
- partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
- mockResponse("", 404);
- assertThrows(
- Catalog.TableNotExistException.class,
- () ->
- restCatalog.dropPartition(
- Identifier.create(databaseName, "table"),
partitionSpec));
- }
-
@Test
public void testListPartitionsWhenMetastorePartitionedIsTrue() throws
Exception {
String databaseName = MockRESTMessage.databaseName();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index ec3c4a47a6..7d19db3177 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -1421,7 +1421,8 @@ public class FlinkCatalog extends AbstractCatalog {
try {
Identifier identifier = toIdentifier(tablePath);
- catalog.createPartition(identifier,
partitionSpec.getPartitionSpec());
+ catalog.createPartitions(
+ identifier,
Collections.singletonList(partitionSpec.getPartitionSpec()));
} catch (Catalog.TableNotExistException e) {
throw new CatalogException(e);
}
@@ -1440,11 +1441,10 @@ public class FlinkCatalog extends AbstractCatalog {
try {
Identifier identifier = toIdentifier(tablePath);
- catalog.dropPartition(identifier,
partitionSpec.getPartitionSpec());
+ catalog.dropPartitions(
+ identifier,
Collections.singletonList(partitionSpec.getPartitionSpec()));
} catch (Catalog.TableNotExistException e) {
throw new CatalogException(e);
- } catch (Catalog.PartitionNotExistException e) {
- throw new PartitionNotExistException(getName(), tablePath,
partitionSpec);
}
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index a213909beb..5afb60e84f 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -50,6 +50,7 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PartitionPathUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -105,6 +107,7 @@ import static
org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
+import static
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
@@ -145,6 +148,8 @@ public class HiveCatalog extends AbstractCatalog {
public static final String HIVE_SITE_FILE = "hive-site.xml";
private static final String HIVE_EXTERNAL_TABLE_PROP = "EXTERNAL";
private static final int DEFAULT_TABLE_BATCH_SIZE = 300;
+ private static final String HIVE_LAST_UPDATE_TIME_PROP =
"transient_lastDdlTime";
+
private final HiveConf hiveConf;
private final String clientClassName;
private final Options options;
@@ -344,40 +349,177 @@ public class HiveCatalog extends AbstractCatalog {
}
}
+ private boolean metastorePartitioned(TableSchema schema) {
+ CoreOptions options = CoreOptions.fromMap(schema.options());
+ return (!schema.partitionKeys().isEmpty() &&
options.partitionedTableInMetastore())
+ || options.tagToPartitionField() != null;
+ }
+
+ @Override
+ public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ Identifier tableIdentifier =
+ Identifier.create(identifier.getDatabaseName(),
identifier.getTableName());
+ Table hmsTable = getHmsTable(tableIdentifier);
+ Path location = getTableLocation(tableIdentifier, hmsTable);
+ TableSchema schema = getDataTableSchema(tableIdentifier, hmsTable);
+
+ if (!metastorePartitioned(schema)) {
+ return;
+ }
+
+ int currentTime = (int) (System.currentTimeMillis() / 1000);
+
+ try {
+ List<Partition> hivePartitions =
+ toHivePartitions(
+ identifier,
+ location.toString(),
+ hmsTable.getSd(),
+ partitions,
+ currentTime);
+ clients.execute(client -> client.add_partitions(hivePartitions,
true, false));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ TableSchema schema = getDataTableSchema(identifier);
+ CoreOptions options = CoreOptions.fromMap(schema.options());
+ boolean tagToPart = options.tagToPartitionField() != null;
+ if (metastorePartitioned(schema)) {
+ List<Map<String, String>> metaPartitions =
+ tagToPart
+ ? partitions
+ :
removePartitionsExistsInOtherBranches(identifier, partitions);
+ for (Map<String, String> part : metaPartitions) {
+ List<String> partitionValues = new ArrayList<>(part.values());
+ try {
+ clients.execute(
+ client ->
+ client.dropPartition(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ partitionValues,
+ false));
+ } catch (NoSuchObjectException e) {
+ // do nothing if the partition not exists
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ if (!tagToPart) {
+ super.dropPartitions(identifier, partitions);
+ }
+ }
+
@Override
- public void dropPartition(Identifier identifier, Map<String, String>
partitionSpec)
+ public void alterPartitions(
+ Identifier identifier, List<org.apache.paimon.partition.Partition>
partitions)
throws TableNotExistException {
TableSchema tableSchema = getDataTableSchema(identifier);
if (!tableSchema.partitionKeys().isEmpty()
- && new
CoreOptions(tableSchema.options()).partitionedTableInMetastore()
- && !partitionExistsInOtherBranches(identifier, partitionSpec))
{
+ && new
CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
+ for (org.apache.paimon.partition.Partition partition : partitions)
{
+ Map<String, String> spec = partition.spec();
+ List<String> partitionValues =
+ tableSchema.partitionKeys().stream()
+ .map(spec::get)
+ .collect(Collectors.toList());
+
+ Map<String, String> statistic = new HashMap<>();
+ statistic.put(NUM_FILES_PROP,
String.valueOf(partition.fileCount()));
+ statistic.put(TOTAL_SIZE_PROP,
String.valueOf(partition.fileSizeInBytes()));
+ statistic.put(NUM_ROWS_PROP,
String.valueOf(partition.recordCount()));
+
+ String modifyTimeSeconds =
String.valueOf(partition.lastFileCreationTime() / 1000);
+ statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
+ // just for being compatible with hive metastore
+ statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
+ try {
+ Partition hivePartition =
+ clients.run(
+ client ->
+ client.getPartition(
+
identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ partitionValues));
+ hivePartition.setValues(partitionValues);
+ hivePartition.setLastAccessTime(
+ (int) (partition.lastFileCreationTime() / 1000));
+ hivePartition.getParameters().putAll(statistic);
+ clients.execute(
+ client ->
+ client.alter_partition(
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ hivePartition));
+ } catch (NoSuchObjectException e) {
+ // do nothing if the partition not exists
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<org.apache.paimon.partition.Partition>
listPartitions(Identifier identifier)
+ throws TableNotExistException {
+ FileStoreTable table = (FileStoreTable) getTable(identifier);
+ String tagToPartitionField = table.coreOptions().tagToPartitionField();
+ if (tagToPartitionField != null) {
try {
- // Do not close client, it is for HiveCatalog
- @SuppressWarnings("resource")
- HiveMetastoreClient metastoreClient =
- new HiveMetastoreClient(
- new Identifier(
- identifier.getDatabaseName(),
identifier.getTableName()),
- clients);
- metastoreClient.dropPartition(new
LinkedHashMap<>(partitionSpec));
+ List<Partition> partitions =
+ clients.run(
+ client ->
+ client.listPartitions(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ Short.MAX_VALUE));
+ return partitions.stream()
+ .map(
+ part ->
+ new
org.apache.paimon.partition.Partition(
+ Collections.singletonMap(
+ tagToPartitionField,
+
part.getValues().get(0)),
+ 1L,
+ 1L,
+ 1L,
+ System.currentTimeMillis()))
+ .collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- super.dropPartition(identifier, partitionSpec);
+ return listPartitionsFromFileSystem(table);
}
- private boolean partitionExistsInOtherBranches(
- Identifier identifier, Map<String, String> partitionSpec)
- throws TableNotExistException {
+ private List<Map<String, String>> removePartitionsExistsInOtherBranches(
+ Identifier identifier, List<Map<String, String>> inputs) throws
TableNotExistException {
FileStoreTable mainTable =
(FileStoreTable)
getTable(
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName()));
+
+ InternalRowPartitionComputer partitionComputer =
+ new InternalRowPartitionComputer(
+ mainTable.coreOptions().partitionDefaultName(),
+ mainTable.rowType().project(mainTable.partitionKeys()),
+ mainTable.partitionKeys().toArray(new String[0]),
+ mainTable.coreOptions().legacyPartitionName());
List<String> branchNames = new
ArrayList<>(mainTable.branchManager().branches());
branchNames.add(DEFAULT_MAIN_BRANCH);
+ Set<Map<String, String>> inputsToRemove = new HashSet<>(inputs);
for (String branchName : branchNames) {
if (branchName.equals(identifier.getBranchNameOrDefault())) {
continue;
@@ -389,12 +531,13 @@ public class HiveCatalog extends AbstractCatalog {
continue;
}
- FileStoreTable table = mainTable.switchToBranch(branchName);
- if
(!table.newScan().withPartitionFilter(partitionSpec).listPartitions().isEmpty())
{
- return true;
- }
+ mainTable.switchToBranch(branchName).newScan()
+ .withPartitionsFilter(new
ArrayList<>(inputsToRemove)).listPartitions().stream()
+ .map(partitionComputer::generatePartValues)
+ .forEach(inputsToRemove::remove);
}
- return false;
+
+ return new ArrayList<>(inputsToRemove);
}
@Override
@@ -1464,4 +1607,30 @@ public class HiveCatalog extends AbstractCatalog {
return DEFAULT_TABLE_BATCH_SIZE;
}
}
+
+ private List<Partition> toHivePartitions(
+ Identifier identifier,
+ String tablePath,
+ StorageDescriptor sd,
+ List<Map<String, String>> partitions,
+ int currentTime) {
+ List<Partition> hivePartitions = new ArrayList<>();
+ for (Map<String, String> partitionSpec : partitions) {
+ Partition hivePartition = new Partition();
+ StorageDescriptor newSd = new StorageDescriptor(sd);
+ newSd.setLocation(
+ tablePath
+ + "/"
+ + PartitionPathUtils.generatePartitionPath(
+ new LinkedHashMap<>(partitionSpec)));
+ hivePartition.setDbName(identifier.getDatabaseName());
+ hivePartition.setTableName(identifier.getTableName());
+ hivePartition.setValues(new ArrayList<>(partitionSpec.values()));
+ hivePartition.setSd(newSd);
+ hivePartition.setCreateTime(currentTime);
+ hivePartition.setLastAccessTime(currentTime);
+ hivePartitions.add(hivePartition);
+ }
+ return hivePartitions;
+ }
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index e733ec16c8..d96fac808c 100644
---
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataField;
@@ -55,6 +56,8 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
+import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
+import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION;
import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP;
import static org.assertj.core.api.Assertions.assertThat;
@@ -448,4 +451,56 @@ public class HiveCatalogTest extends CatalogTestBase {
externalWarehouseCatalog.close();
}
+
+ @Test
+ public void testTagToPartitionTable() throws Exception {
+ String databaseName = "testTagToPartitionTable";
+ catalog.dropDatabase(databaseName, true, true);
+ catalog.createDatabase(databaseName, true);
+ Identifier identifier = Identifier.create(databaseName, "table");
+ catalog.createTable(
+ identifier,
+ Schema.newBuilder()
+ .option(METASTORE_TAG_TO_PARTITION.key(), "dt")
+ .column("col", DataTypes.INT())
+ .column("dt", DataTypes.STRING())
+ .build(),
+ true);
+
+ catalog.createPartitions(
+ identifier,
+ Arrays.asList(
+ Collections.singletonMap("dt", "20250101"),
+ Collections.singletonMap("dt", "20250102")));
+
assertThat(catalog.listPartitions(identifier).stream().map(Partition::spec))
+ .containsExactlyInAnyOrder(
+ Collections.singletonMap("dt", "20250102"),
+ Collections.singletonMap("dt", "20250101"));
+ }
+
+ @Test
+ public void testPartitionTable() throws Exception {
+ String databaseName = "testPartitionTable";
+ catalog.dropDatabase(databaseName, true, true);
+ catalog.createDatabase(databaseName, true);
+ Identifier identifier = Identifier.create(databaseName, "table");
+ catalog.createTable(
+ identifier,
+ Schema.newBuilder()
+ .option(METASTORE_PARTITIONED_TABLE.key(), "true")
+ .column("col", DataTypes.INT())
+ .column("dt", DataTypes.STRING())
+ .partitionKeys("dt")
+ .build(),
+ true);
+
+ catalog.createPartitions(
+ identifier,
+ Arrays.asList(
+ Collections.singletonMap("dt", "20250101"),
+ Collections.singletonMap("dt", "20250102")));
+
+ // hive catalog list partitions from filesystem, so here return empty.
+ assertThat(catalog.listPartitions(identifier)).isEmpty();
+ }
}