This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 082edcf95d [core] Introduce supportsPartitionModification to Catalog
(#7228)
082edcf95d is described below
commit 082edcf95dcf4f49b11f6b25d181eb0cb7e2aeed
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Feb 6 16:06:16 2026 +0800
[core] Introduce supportsPartitionModification to Catalog (#7228)
See `RESTCatalogTest.testPartitionExpire`, partition expire should use
commit user prefix.
This PR fix it, and refactoring improves the semantic clarity of the
code, and the name PartitionModification more accurately describes the
responsibilities of the interface - specifically designed to handle
partition modification operations, rather than general partition
handling.
---
.../java/org/apache/paimon/AbstractFileStore.java | 29 ++++++++--------
.../org/apache/paimon/catalog/AbstractCatalog.java | 23 +++----------
.../java/org/apache/paimon/catalog/Catalog.java | 39 ++++++++++++++++++----
.../org/apache/paimon/catalog/CatalogUtils.java | 3 +-
.../org/apache/paimon/catalog/DelegateCatalog.java | 5 +++
.../metastore/AddPartitionCommitCallback.java | 13 ++++----
.../paimon/metastore/AddPartitionTagCallback.java | 15 +++++----
.../apache/paimon/operation/PartitionExpire.java | 16 ++++-----
.../partition/actions/AddDonePartitionAction.java | 12 +++----
.../actions/MarkPartitionDoneEventAction.java | 14 ++++----
.../partition/actions/PartitionMarkDoneAction.java | 33 ++++++++++++++----
.../java/org/apache/paimon/rest/RESTCatalog.java | 25 ++------------
.../apache/paimon/table/CatalogEnvironment.java | 26 ++++++++++++---
...artitionHandler.java => PartitionMarkDone.java} | 35 +++----------------
...tionHandler.java => PartitionModification.java} | 17 +++-------
.../paimon/utils/PartitionStatisticsReporter.java | 15 +++++----
.../paimon/operation/PartitionExpireTest.java | 16 ++++-----
.../org/apache/paimon/rest/RESTCatalogServer.java | 1 +
.../org/apache/paimon/rest/RESTCatalogTest.java | 27 +++++++++++++++
.../utils/PartitionStatisticsReporterTest.java | 29 +++++++---------
.../sink/listener/ReportPartStatsListener.java | 9 ++---
.../sink/listener/AddDonePartitionActionTest.java | 12 +++----
.../java/org/apache/paimon/hive/HiveCatalog.java | 5 +++
.../paimon/spark/PaimonPartitionManagement.scala | 16 ++++-----
.../apache/paimon/spark/write/WriteHelper.scala | 4 +--
25 files changed, 231 insertions(+), 208 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index eea68990c7..3c4ef3168a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -56,7 +56,7 @@ import org.apache.paimon.stats.StatsFile;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitPreCallback;
@@ -385,10 +385,11 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
List<CommitCallback> callbacks = new ArrayList<>();
if (options.partitionedTableInMetastore() &&
!schema.partitionKeys().isEmpty()) {
- PartitionHandler partitionHandler =
catalogEnvironment.partitionHandler();
- if (partitionHandler != null) {
+ PartitionModification partitionModification =
+ catalogEnvironment.partitionModification();
+ if (partitionModification != null) {
callbacks.add(
- new AddPartitionCommitCallback(partitionHandler,
partitionComputer()));
+ new AddPartitionCommitCallback(partitionModification,
partitionComputer()));
}
}
@@ -396,12 +397,13 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
if (options.tagToPartitionField() != null
&& tagPreview != null
&& schema.partitionKeys().isEmpty()) {
- PartitionHandler partitionHandler =
catalogEnvironment.partitionHandler();
- if (partitionHandler != null) {
+ PartitionModification partitionModification =
+ catalogEnvironment.partitionModification();
+ if (partitionModification != null) {
TagPreviewCommitCallback callback =
new TagPreviewCommitCallback(
new AddPartitionTagCallback(
- partitionHandler,
options.tagToPartitionField()),
+ partitionModification,
options.tagToPartitionField()),
tagPreview);
callbacks.add(callback);
}
@@ -447,9 +449,9 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
Duration expirationTime,
Duration checkInterval,
PartitionExpireStrategy expireStrategy) {
- PartitionHandler partitionHandler = null;
+ PartitionModification partitionModification = null;
if (options.partitionedTableInMetastore()) {
- partitionHandler = catalogEnvironment.partitionHandler();
+ partitionModification = catalogEnvironment.partitionModification();
}
return new PartitionExpire(
@@ -458,7 +460,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
expireStrategy,
newScan(),
newCommit(commitUser, table),
- partitionHandler,
+ partitionModification,
options.endInputCheckPartitionExpire(),
options.partitionExpireMaxNum(),
options.partitionExpireBatchSize());
@@ -480,9 +482,10 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
String partitionField = options.tagToPartitionField();
if (partitionField != null) {
- PartitionHandler partitionHandler =
catalogEnvironment.partitionHandler();
- if (partitionHandler != null) {
- callbacks.add(new AddPartitionTagCallback(partitionHandler,
partitionField));
+ PartitionModification partitionModification =
+ catalogEnvironment.partitionModification();
+ if (partitionModification != null) {
+ callbacks.add(new
AddPartitionTagCallback(partitionModification, partitionField));
}
}
if (options.tagCreateSuccessFile()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index bfb608dcdd..740b52e7a1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -40,7 +40,6 @@ import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
-import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.SnapshotNotExistException;
@@ -582,29 +581,15 @@ public abstract class AbstractCatalog implements Catalog {
}
@Override
- public TableQueryAuthResult authTableQuery(Identifier identifier,
List<String> select) {
- throw new UnsupportedOperationException();
+ public boolean supportsPartitionModification() {
+ return false;
}
@Override
- public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {}
-
- @Override
- public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {
- Table table = getTable(identifier);
- try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
- commit.truncatePartitions(partitions);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ public TableQueryAuthResult authTableQuery(Identifier identifier,
List<String> select) {
+ throw new UnsupportedOperationException();
}
- @Override
- public void alterPartitions(Identifier identifier,
List<PartitionStatistics> partitions)
- throws TableNotExistException {}
-
@Override
public List<String> listFunctions(String databaseName) {
return Collections.emptyList();
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 71467255c4..192dbd33ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -32,6 +32,7 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewChange;
@@ -869,6 +870,25 @@ public interface Catalog extends AutoCloseable {
// ==================== Partition Modifications ==========================
+ /**
+ * Whether this catalog supports partition modification for tables.
+ *
+ * <p>If not, following methods will do nothing:
+ *
+ * <ul>
+ * <li>{@link #createPartitions(Identifier, List)}.
+ * <li>{@link #alterPartitions(Identifier, List)}.
+ * </ul>
+ *
+ * <p>If not, following method will be exactly the same as directly using
{@link
+ * BatchTableCommit#truncatePartitions}:
+ *
+ * <ul>
+ * <li>{@link #dropPartitions(Identifier, List)}.
+ * </ul>
+ */
+ boolean supportsPartitionModification();
+
/**
* Create partitions of the specify table. Ignore existing partitions.
*
@@ -876,8 +896,8 @@ public interface Catalog extends AutoCloseable {
* @param partitions partitions to be created
* @throws TableNotExistException if the table does not exist
*/
- void createPartitions(Identifier identifier, List<Map<String, String>>
partitions)
- throws TableNotExistException;
+ default void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {}
/**
* Drop partitions of the specify table. Ignore non-existent partitions.
@@ -886,8 +906,15 @@ public interface Catalog extends AutoCloseable {
* @param partitions partitions to be deleted
* @throws TableNotExistException if the table does not exist
*/
- void dropPartitions(Identifier identifier, List<Map<String, String>>
partitions)
- throws TableNotExistException;
+ default void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ Table table = getTable(identifier);
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.truncatePartitions(partitions);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
/**
* Alter partitions of the specify table. For non-existent partitions,
partitions will be
@@ -897,8 +924,8 @@ public interface Catalog extends AutoCloseable {
* @param partitions partitions to be altered
* @throws TableNotExistException if the table does not exist
*/
- void alterPartitions(Identifier identifier, List<PartitionStatistics>
partitions)
- throws TableNotExistException;
+ default void alterPartitions(Identifier identifier,
List<PartitionStatistics> partitions)
+ throws TableNotExistException {}
// ======================= Function methods ===============================
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 1dd9d6d64e..d766e114c0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -284,7 +284,8 @@ public class CatalogUtils {
isRestCatalog ? null : lockFactory,
isRestCatalog ? null : lockContext,
catalogContext,
- catalog.supportsVersionManagement());
+ catalog.supportsVersionManagement(),
+ catalog.supportsPartitionModification());
Path path = new Path(schema.options().get(PATH.key()));
FileStoreTable table =
FileStoreTableFactory.create(dataFileIO.apply(path), path,
schema, catalogEnv);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 4f3405187f..623993444f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -281,6 +281,11 @@ public abstract class DelegateCatalog implements Catalog {
return wrapped.commitSnapshot(identifier, tableUuid, snapshot,
statistics);
}
+ @Override
+ public boolean supportsPartitionModification() {
+ return wrapped.supportsPartitionModification();
+ }
+
@Override
public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
throws TableNotExistException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index cbf41f37af..5a2fd79ab1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -25,7 +25,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -51,12 +51,13 @@ public class AddPartitionCommitCallback implements
CommitCallback {
.softValues()
.build();
- private final PartitionHandler partitionHandler;
+ private final PartitionModification partitionModification;
private final InternalRowPartitionComputer partitionComputer;
public AddPartitionCommitCallback(
- PartitionHandler partitionHandler, InternalRowPartitionComputer
partitionComputer) {
- this.partitionHandler = partitionHandler;
+ PartitionModification partitionModification,
+ InternalRowPartitionComputer partitionComputer) {
+ this.partitionModification = partitionModification;
this.partitionComputer = partitionComputer;
}
@@ -92,7 +93,7 @@ public class AddPartitionCommitCallback implements
CommitCallback {
}
}
if (!newPartitions.isEmpty()) {
- partitionHandler.createPartitions(
+ partitionModification.createPartitions(
newPartitions.stream()
.map(partitionComputer::generatePartValues)
.collect(Collectors.toList()));
@@ -105,6 +106,6 @@ public class AddPartitionCommitCallback implements
CommitCallback {
@Override
public void close() throws Exception {
- partitionHandler.close();
+ partitionModification.close();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
index ca8c7b7690..89e7ee2ae4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
@@ -18,7 +18,7 @@
package org.apache.paimon.metastore;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.table.sink.TagCallback;
import java.util.Collections;
@@ -27,11 +27,12 @@ import java.util.LinkedHashMap;
/** A {@link TagCallback} to add newly created partitions to metastore. */
public class AddPartitionTagCallback implements TagCallback {
- private final PartitionHandler partitionHandler;
+ private final PartitionModification partitionModification;
private final String partitionField;
- public AddPartitionTagCallback(PartitionHandler partitionHandler, String
partitionField) {
- this.partitionHandler = partitionHandler;
+ public AddPartitionTagCallback(
+ PartitionModification partitionModification, String
partitionField) {
+ this.partitionModification = partitionModification;
this.partitionField = partitionField;
}
@@ -40,7 +41,7 @@ public class AddPartitionTagCallback implements TagCallback {
LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
partitionSpec.put(partitionField, tagName);
try {
-
partitionHandler.createPartitions(Collections.singletonList(partitionSpec));
+
partitionModification.createPartitions(Collections.singletonList(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -51,7 +52,7 @@ public class AddPartitionTagCallback implements TagCallback {
LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
partitionSpec.put(partitionField, tagName);
try {
-
partitionHandler.dropPartitions(Collections.singletonList(partitionSpec));
+
partitionModification.dropPartitions(Collections.singletonList(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -59,6 +60,6 @@ public class AddPartitionTagCallback implements TagCallback {
@Override
public void close() throws Exception {
- partitionHandler.close();
+ partitionModification.close();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index f1e4ae096e..d32161dc0c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -24,7 +24,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.partition.PartitionExpireStrategy;
import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -53,7 +53,7 @@ public class PartitionExpire {
private final Duration checkInterval;
private final FileStoreScan scan;
private final FileStoreCommit commit;
- @Nullable private final PartitionHandler partitionHandler;
+ @Nullable private final PartitionModification partitionModification;
private LocalDateTime lastCheck;
private final PartitionExpireStrategy strategy;
private final boolean endInputCheckPartitionExpire;
@@ -66,7 +66,7 @@ public class PartitionExpire {
PartitionExpireStrategy strategy,
FileStoreScan scan,
FileStoreCommit commit,
- @Nullable PartitionHandler partitionHandler,
+ @Nullable PartitionModification partitionModification,
boolean endInputCheckPartitionExpire,
int maxExpireNum,
int expireBatchSize) {
@@ -75,7 +75,7 @@ public class PartitionExpire {
this.strategy = strategy;
this.scan = scan;
this.commit = commit;
- this.partitionHandler = partitionHandler;
+ this.partitionModification = partitionModification;
// Avoid the execution time of stream jobs from being too short and
preventing partition
// expiration
long rndSeconds = 0;
@@ -95,7 +95,7 @@ public class PartitionExpire {
PartitionExpireStrategy strategy,
FileStoreScan scan,
FileStoreCommit commit,
- @Nullable PartitionHandler partitionHandler,
+ @Nullable PartitionModification partitionModification,
int maxExpireNum,
int expireBatchSize) {
this(
@@ -104,7 +104,7 @@ public class PartitionExpire {
strategy,
scan,
commit,
- partitionHandler,
+ partitionModification,
false,
maxExpireNum,
expireBatchSize);
@@ -177,9 +177,9 @@ public class PartitionExpire {
private void doBatchExpire(
List<Map<String, String>> expiredBatchPartitions, long
commitIdentifier) {
- if (partitionHandler != null) {
+ if (partitionModification != null) {
try {
- partitionHandler.dropPartitions(expiredBatchPartitions);
+ partitionModification.dropPartitions(expiredBatchPartitions);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
index 3d76a7d374..1dc800ac0c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
@@ -19,7 +19,7 @@
package org.apache.paimon.partition.actions;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
@@ -33,10 +33,10 @@ import static
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFro
/** A {@link PartitionMarkDoneAction} which add ".done" partition. */
public class AddDonePartitionAction implements PartitionMarkDoneAction {
- private final PartitionHandler partitionHandler;
+ private final PartitionModification partitionModification;
- public AddDonePartitionAction(PartitionHandler partitionHandler) {
- this.partitionHandler = partitionHandler;
+ public AddDonePartitionAction(PartitionModification partitionModification)
{
+ this.partitionModification = partitionModification;
}
@Override
@@ -44,7 +44,7 @@ public class AddDonePartitionAction implements
PartitionMarkDoneAction {
LinkedHashMap<String, String> doneSpec =
extractPartitionSpecFromPath(new Path(partition));
Map.Entry<String, String> lastField = tailEntry(doneSpec);
doneSpec.put(lastField.getKey(), lastField.getValue() + ".done");
- partitionHandler.createPartitions(Collections.singletonList(doneSpec));
+
partitionModification.createPartitions(Collections.singletonList(doneSpec));
}
private Map.Entry<String, String> tailEntry(LinkedHashMap<String, String>
partitionSpec) {
@@ -54,7 +54,7 @@ public class AddDonePartitionAction implements
PartitionMarkDoneAction {
@Override
public void close() throws IOException {
try {
- partitionHandler.close();
+ partitionModification.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
index ae81c0dde1..39cfede4fa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
@@ -19,7 +19,7 @@
package org.apache.paimon.partition.actions;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionMarkDone;
import java.io.IOException;
import java.util.Collections;
@@ -27,26 +27,26 @@ import java.util.LinkedHashMap;
import static
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
-/** A {@link PartitionMarkDoneAction} which add mark
"PartitionEventType.LOAD_DONE". */
+/** A {@link PartitionMarkDoneAction} which add done in metastore. */
public class MarkPartitionDoneEventAction implements PartitionMarkDoneAction {
- private final PartitionHandler partitionHandler;
+ private final PartitionMarkDone partitionMarkDone;
- public MarkPartitionDoneEventAction(PartitionHandler partitionHandler) {
- this.partitionHandler = partitionHandler;
+ public MarkPartitionDoneEventAction(PartitionMarkDone partitionMarkDone) {
+ this.partitionMarkDone = partitionMarkDone;
}
@Override
public void markDone(String partition) throws Exception {
LinkedHashMap<String, String> partitionSpec =
extractPartitionSpecFromPath(new Path(partition));
-
partitionHandler.markDonePartitions(Collections.singletonList(partitionSpec));
+
partitionMarkDone.markDonePartitions(Collections.singletonList(partitionSpec));
}
@Override
public void close() throws IOException {
try {
- partitionHandler.close();
+ partitionMarkDone.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
index 2c48ce8d9b..680aa13277 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
@@ -20,7 +20,8 @@ package org.apache.paimon.partition.actions;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionMarkDone;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.utils.StringUtils;
import java.io.Closeable;
@@ -57,13 +58,13 @@ public interface PartitionMarkDoneAction extends Closeable {
case DONE_PARTITION:
instance =
new AddDonePartitionAction(
- createPartitionHandler(
+
createPartitionModification(
fileStoreTable,
options));
break;
case MARK_EVENT:
instance =
new MarkPartitionDoneEventAction(
- createPartitionHandler(
+ createPartitionMarkDone(
fileStoreTable,
options));
break;
case HTTP_REPORT:
@@ -100,18 +101,36 @@ public interface PartitionMarkDoneAction extends
Closeable {
}
}
- static PartitionHandler createPartitionHandler(FileStoreTable table,
CoreOptions options) {
- PartitionHandler partitionHandler =
table.catalogEnvironment().partitionHandler();
+ static PartitionModification createPartitionModification(
+ FileStoreTable table, CoreOptions options) {
+ PartitionModification partitionModification =
+ table.catalogEnvironment().partitionModification();
if
(options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).contains("done-partition"))
{
checkNotNull(
- partitionHandler, "Cannot mark done partition for table
without metastore.");
+ partitionModification,
+ "Cannot mark done partition for table without metastore.");
checkArgument(
options.partitionedTableInMetastore(),
"Table should enable %s",
METASTORE_PARTITIONED_TABLE.key());
}
- return partitionHandler;
+ return partitionModification;
+ }
+
+ static PartitionMarkDone createPartitionMarkDone(FileStoreTable table,
CoreOptions options) {
+ PartitionMarkDone partitionMarkDone =
table.catalogEnvironment().partitionMarkDone();
+
+ if
(options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).contains("mark-event"))
{
+ checkNotNull(
+ partitionMarkDone, "Cannot mark done partition for table
without metastore.");
+ checkArgument(
+ options.partitionedTableInMetastore(),
+ "Table should enable %s",
+ METASTORE_PARTITIONED_TABLE.key());
+ }
+
+ return partitionMarkDone;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index e5d7c0c0c7..46262601d8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -59,7 +59,6 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
-import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotNotExistException;
@@ -693,28 +692,8 @@ public class RESTCatalog implements Catalog {
}
@Override
- public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {
- // partitions of the REST Catalog server are automatically calculated
and do not require
- // special creating.
- }
-
- @Override
- public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {
- Table table = getTable(identifier);
- try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
- commit.truncatePartitions(partitions);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void alterPartitions(Identifier identifier,
List<PartitionStatistics> partitions)
- throws TableNotExistException {
- // The partition statistics of the REST Catalog server are
automatically calculated and do
- // not require special reporting.
+ public boolean supportsPartitionModification() {
+ return false;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index 8f68d3e04a..882ad952b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -52,6 +52,7 @@ public class CatalogEnvironment implements Serializable {
@Nullable private final CatalogLockContext lockContext;
@Nullable private final CatalogContext catalogContext;
private final boolean supportsVersionManagement;
+ private final boolean supportsPartitionModification;
public CatalogEnvironment(
@Nullable Identifier identifier,
@@ -60,7 +61,8 @@ public class CatalogEnvironment implements Serializable {
@Nullable CatalogLockFactory lockFactory,
@Nullable CatalogLockContext lockContext,
@Nullable CatalogContext catalogContext,
- boolean supportsVersionManagement) {
+ boolean supportsVersionManagement,
+ boolean supportsPartitionModification) {
this.identifier = identifier;
this.uuid = uuid;
this.catalogLoader = catalogLoader;
@@ -68,10 +70,11 @@ public class CatalogEnvironment implements Serializable {
this.lockContext = lockContext;
this.catalogContext = catalogContext;
this.supportsVersionManagement = supportsVersionManagement;
+ this.supportsPartitionModification = supportsPartitionModification;
}
public static CatalogEnvironment empty() {
- return new CatalogEnvironment(null, null, null, null, null, null,
false);
+ return new CatalogEnvironment(null, null, null, null, null, null,
false, false);
}
@Nullable
@@ -85,12 +88,24 @@ public class CatalogEnvironment implements Serializable {
}
@Nullable
- public PartitionHandler partitionHandler() {
+ public PartitionModification partitionModification() {
+ if (catalogLoader == null) {
+ return null;
+ }
+ if (!supportsPartitionModification) {
+ return null;
+ }
+ Catalog catalog = catalogLoader.load();
+ return PartitionModification.create(catalog, identifier);
+ }
+
+ @Nullable
+ public PartitionMarkDone partitionMarkDone() {
if (catalogLoader == null) {
return null;
}
Catalog catalog = catalogLoader.load();
- return PartitionHandler.create(catalog, identifier);
+ return PartitionMarkDone.create(catalog, identifier);
}
public boolean supportsVersionManagement() {
@@ -164,7 +179,8 @@ public class CatalogEnvironment implements Serializable {
lockFactory,
lockContext,
catalogContext,
- supportsVersionManagement);
+ supportsVersionManagement,
+ supportsPartitionModification);
}
public TableQueryAuth tableQueryAuth(CoreOptions options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java
b/paimon-core/src/main/java/org/apache/paimon/table/PartitionMarkDone.java
similarity index 52%
copy from
paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java
copy to paimon-core/src/main/java/org/apache/paimon/table/PartitionMarkDone.java
index c93bfdab04..2c466ebcdd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PartitionMarkDone.java
@@ -20,45 +20,18 @@ package org.apache.paimon.table;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.partition.PartitionStatistics;
import java.util.List;
import java.util.Map;
-/** Handler to handle partitions. */
-public interface PartitionHandler extends AutoCloseable {
-
- void createPartitions(List<Map<String, String>> partitions)
- throws Catalog.TableNotExistException;
-
- void dropPartitions(List<Map<String, String>> partitions) throws
Catalog.TableNotExistException;
-
- void alterPartitions(List<PartitionStatistics> partitions)
- throws Catalog.TableNotExistException;
+/** Handler to mark done partitions. */
+public interface PartitionMarkDone extends AutoCloseable {
void markDonePartitions(List<Map<String, String>> partitions)
throws Catalog.TableNotExistException;
- static PartitionHandler create(Catalog catalog, Identifier identifier) {
- return new PartitionHandler() {
-
- @Override
- public void createPartitions(List<Map<String, String>> partitions)
- throws Catalog.TableNotExistException {
- catalog.createPartitions(identifier, partitions);
- }
-
- @Override
- public void dropPartitions(List<Map<String, String>> partitions)
- throws Catalog.TableNotExistException {
- catalog.dropPartitions(identifier, partitions);
- }
-
- @Override
- public void alterPartitions(List<PartitionStatistics> partitions)
- throws Catalog.TableNotExistException {
- catalog.alterPartitions(identifier, partitions);
- }
+ static PartitionMarkDone create(Catalog catalog, Identifier identifier) {
+ return new PartitionMarkDone() {
@Override
public void markDonePartitions(List<Map<String, String>>
partitions)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java
b/paimon-core/src/main/java/org/apache/paimon/table/PartitionModification.java
similarity index 79%
rename from
paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/PartitionModification.java
index c93bfdab04..b3850bf9be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PartitionModification.java
@@ -25,8 +25,8 @@ import org.apache.paimon.partition.PartitionStatistics;
import java.util.List;
import java.util.Map;
-/** Handler to handle partitions. */
-public interface PartitionHandler extends AutoCloseable {
+/** Handler to partitions modification. */
+public interface PartitionModification extends AutoCloseable {
void createPartitions(List<Map<String, String>> partitions)
throws Catalog.TableNotExistException;
@@ -36,11 +36,8 @@ public interface PartitionHandler extends AutoCloseable {
void alterPartitions(List<PartitionStatistics> partitions)
throws Catalog.TableNotExistException;
- void markDonePartitions(List<Map<String, String>> partitions)
- throws Catalog.TableNotExistException;
-
- static PartitionHandler create(Catalog catalog, Identifier identifier) {
- return new PartitionHandler() {
+ static PartitionModification create(Catalog catalog, Identifier
identifier) {
+ return new PartitionModification() {
@Override
public void createPartitions(List<Map<String, String>> partitions)
@@ -60,12 +57,6 @@ public interface PartitionHandler extends AutoCloseable {
catalog.alterPartitions(identifier, partitions);
}
- @Override
- public void markDonePartitions(List<Map<String, String>>
partitions)
- throws Catalog.TableNotExistException {
- catalog.markDonePartitions(identifier, partitions);
- }
-
@Override
public void close() throws Exception {
catalog.close();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
index da8d47ba8c..c34bdec5b5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
@@ -23,7 +23,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -46,14 +46,15 @@ public class PartitionStatisticsReporter implements
Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(PartitionStatisticsReporter.class);
- private final PartitionHandler partitionHandler;
+ private final PartitionModification partitionModification;
private final SnapshotReader snapshotReader;
private final SnapshotManager snapshotManager;
- public PartitionStatisticsReporter(FileStoreTable table, PartitionHandler
partitionHandler) {
- this.partitionHandler =
+ public PartitionStatisticsReporter(
+ FileStoreTable table, PartitionModification partitionModification)
{
+ this.partitionModification =
Preconditions.checkNotNull(
- partitionHandler, "the partition handler factory is
null");
+ partitionModification, "the partition handler factory
is null");
this.snapshotReader = table.newSnapshotReader();
this.snapshotManager = table.snapshotManager();
}
@@ -94,14 +95,14 @@ public class PartitionStatisticsReporter implements
Closeable {
modifyTimeMillis,
totalBuckets);
LOG.info("alter partition {} with statistic {}.", partitionSpec,
partitionStats);
-
partitionHandler.alterPartitions(Collections.singletonList(partitionStats));
+
partitionModification.alterPartitions(Collections.singletonList(partitionStats));
}
}
@Override
public void close() throws IOException {
try {
- partitionHandler.close();
+ partitionModification.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 9f3903bd9b..4eef95b55b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -36,7 +36,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -104,8 +104,8 @@ public class PartitionExpireTest {
String branchName = CoreOptions.branch(options.toMap());
TableSchema tableSchema = new SchemaManager(fileIO, tablePath,
branchName).latest().get();
deletedPartitions = new ArrayList<>();
- PartitionHandler partitionHandler =
- new PartitionHandler() {
+ PartitionModification partitionModification =
+ new PartitionModification() {
@Override
public void createPartitions(List<Map<String, String>>
partitions)
throws Catalog.TableNotExistException {}
@@ -128,20 +128,16 @@ public class PartitionExpireTest {
public void alterPartitions(List<PartitionStatistics>
partitions)
throws Catalog.TableNotExistException {}
- @Override
- public void markDonePartitions(List<Map<String, String>>
partitions)
- throws Catalog.TableNotExistException {}
-
@Override
public void close() throws Exception {}
};
CatalogEnvironment env =
- new CatalogEnvironment(null, null, null, null, null, null,
false) {
+ new CatalogEnvironment(null, null, null, null, null, null,
false, false) {
@Override
- public PartitionHandler partitionHandler() {
- return partitionHandler;
+ public PartitionModification partitionModification() {
+ return partitionModification;
}
};
table = FileStoreTableFactory.create(fileIO, path, tableSchema, env);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 98d3974e16..98e1beddda 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -2589,6 +2589,7 @@ public class RESTCatalogServer {
catalog.lockFactory().orElse(null),
catalog.lockContext().orElse(null),
catalogContext,
+ false,
false);
Path path = new Path(schema.options().get(PATH.key()));
FileIO dataFileIO = catalog.fileIO();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index f18c4752ab..2bf7d25a8c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -124,8 +124,12 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
+import static org.apache.paimon.CoreOptions.COMMIT_USER_PREFIX;
+import static org.apache.paimon.CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION;
+import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_STRATEGY;
+import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
import static org.apache.paimon.CoreOptions.QUERY_AUTH_ENABLED;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.OBJECT_TABLE;
@@ -1611,6 +1615,29 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
assertPagedPartitions(pagedPartitions, 1, partitionSpecs.get(1));
}
+ @Test
+ void testPartitionExpire() throws Exception {
+ // create table
+ Identifier identifier = Identifier.create("test_db",
"test_partition_expire");
+ Map<String, String> options = new HashMap<>();
+ options.put(PARTITION_EXPIRATION_STRATEGY.key(), "update-time");
+ options.put(PARTITION_EXPIRATION_TIME.key(), "1 ms");
+ options.put(END_INPUT_CHECK_PARTITION_EXPIRE.key(), "TRUE");
+ options.put(METASTORE_PARTITIONED_TABLE.key(), "TRUE");
+ createTable(identifier, options, Lists.newArrayList("col1"));
+
+ // write and expire table
+ Table table =
+ catalog.getTable(identifier)
+ .copy(singletonMap(COMMIT_USER_PREFIX.key(),
"my_user"));
+ batchWrite(table, Arrays.asList(1, 2, 3));
+ Thread.sleep(1000);
+ batchWrite(table, Arrays.asList(4, 5, 6));
+ Snapshot snapshot = table.latestSnapshot().get();
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+ assertThat(snapshot.commitUser()).startsWith("my_user");
+ }
+
@Test
void testRefreshFileIO() throws Exception {
this.catalog = newRestCatalogWithDataToken();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
index 02e81bdfd0..53b5f37592 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
@@ -27,7 +27,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
@@ -91,8 +91,8 @@ public class PartitionStatisticsReporterTest {
AtomicBoolean closed = new AtomicBoolean(false);
Map<String, PartitionStatistics> partitionParams = Maps.newHashMap();
- PartitionHandler partitionHandler =
- new PartitionHandler() {
+ PartitionModification partitionModification =
+ new PartitionModification() {
@Override
public void createPartitions(List<Map<String, String>>
partitions) {
@@ -104,22 +104,17 @@ public class PartitionStatisticsReporterTest {
throw new UnsupportedOperationException();
}
- @Override
- public void markDonePartitions(List<Map<String, String>>
partitions) {
- throw new UnsupportedOperationException();
- }
-
@Override
public void alterPartitions(List<PartitionStatistics>
partitions) {
partitions.forEach(
- partition -> {
- partitionParams.put(
-
PartitionPathUtils.generatePartitionPath(
- partition.spec(),
-
table.rowType().project(table.partitionKeys()),
- false),
- partition);
- });
+ partition ->
+ partitionParams.put(
+
PartitionPathUtils.generatePartitionPath(
+ partition.spec(),
+ table.rowType()
+
.project(table.partitionKeys()),
+ false),
+ partition));
}
@Override
@@ -129,7 +124,7 @@ public class PartitionStatisticsReporterTest {
};
PartitionStatisticsReporter action =
- new PartitionStatisticsReporter(table, partitionHandler);
+ new PartitionStatisticsReporter(table, partitionModification);
long time = 1729598544974L;
action.report("c1=a/", time);
assertThat(partitionParams).containsKey("c1=a/");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
index 488b54cbd7..c4db738cdf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
@@ -22,7 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -158,9 +158,10 @@ public class ReportPartStatsListener implements
CommitListener {
return Optional.empty();
}
- PartitionHandler partitionHandler =
table.catalogEnvironment().partitionHandler();
+ PartitionModification partitionModification =
+ table.catalogEnvironment().partitionModification();
- if (partitionHandler == null) {
+ if (partitionModification == null) {
return Optional.empty();
}
@@ -174,7 +175,7 @@ public class ReportPartStatsListener implements
CommitListener {
return Optional.of(
new ReportPartStatsListener(
partitionComputer,
- new PartitionStatisticsReporter(table,
partitionHandler),
+ new PartitionStatisticsReporter(table,
partitionModification),
stateStore,
isRestored,
options.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
index af58800e04..6b5693dd05 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.partition.actions.AddDonePartitionAction;
-import org.apache.paimon.table.PartitionHandler;
+import org.apache.paimon.table.PartitionModification;
import org.junit.jupiter.api.Test;
@@ -41,8 +41,8 @@ class AddDonePartitionActionTest {
public void test() throws Exception {
AtomicBoolean closed = new AtomicBoolean(false);
Set<String> donePartitions = new HashSet<>();
- PartitionHandler partitionHandler =
- new PartitionHandler() {
+ PartitionModification partitionModification =
+ new PartitionModification() {
@Override
public void close() throws Exception {
@@ -66,13 +66,9 @@ class AddDonePartitionActionTest {
@Override
public void alterPartitions(List<PartitionStatistics>
partitions)
throws Catalog.TableNotExistException {}
-
- @Override
- public void markDonePartitions(List<Map<String, String>>
partitions)
- throws Catalog.TableNotExistException {}
};
- AddDonePartitionAction action = new
AddDonePartitionAction(partitionHandler);
+ AddDonePartitionAction action = new
AddDonePartitionAction(partitionModification);
// test normal
action.markDone("dt=20201202");
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index cd295133b9..38ecd9372d 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -353,6 +353,11 @@ public class HiveCatalog extends AbstractCatalog {
}
}
+ @Override
+ public boolean supportsPartitionModification() {
+ return true;
+ }
+
private boolean metastorePartitioned(TableSchema schema) {
CoreOptions options = CoreOptions.fromMap(schema.options());
return (!schema.partitionKeys().isEmpty() &&
options.partitionedTableInMetastore())
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 27fa93458b..312a340397 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -67,12 +67,12 @@ trait PaimonPartitionManagement extends
SupportsAtomicPartitionManagement {
table match {
case fileStoreTable: FileStoreTable =>
val partitions = toPaimonPartitions(rows).toSeq.asJava
- val partitionHandler =
fileStoreTable.catalogEnvironment().partitionHandler()
- if (partitionHandler != null) {
+ val partitionModification =
fileStoreTable.catalogEnvironment().partitionModification()
+ if (partitionModification != null) {
try {
- partitionHandler.dropPartitions(partitions)
+ partitionModification.dropPartitions(partitions)
} finally {
- partitionHandler.close()
+ partitionModification.close()
}
} else {
val commit = fileStoreTable.newBatchWriteBuilder().newCommit()
@@ -151,14 +151,14 @@ trait PaimonPartitionManagement extends
SupportsAtomicPartitionManagement {
table match {
case fileStoreTable: FileStoreTable =>
val partitions = toPaimonPartitions(rows)
- val partitionHandler =
fileStoreTable.catalogEnvironment().partitionHandler()
- if (partitionHandler != null) {
+ val partitionModification =
fileStoreTable.catalogEnvironment().partitionModification()
+ if (partitionModification != null) {
try {
if (fileStoreTable.coreOptions().partitionedTableInMetastore()) {
- partitionHandler.createPartitions(partitions.toSeq.asJava)
+ partitionModification.createPartitions(partitions.toSeq.asJava)
}
} finally {
- partitionHandler.close()
+ partitionModification.close()
}
}
case _ =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/WriteHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/WriteHelper.scala
index bc07a310c0..899cfd97c7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/WriteHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/WriteHelper.scala
@@ -52,7 +52,7 @@ trait WriteHelper extends Logging {
config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis
<= 0 ||
table.partitionKeys.isEmpty ||
!coreOptions.partitionedTableInMetastore ||
- table.catalogEnvironment.partitionHandler() == null
+ table.catalogEnvironment.partitionModification() == null
) {
return
}
@@ -65,7 +65,7 @@ trait WriteHelper extends Logging {
)
val hmsReporter = new PartitionStatisticsReporter(
table,
- table.catalogEnvironment.partitionHandler()
+ table.catalogEnvironment.partitionModification()
)
val partitions = messages.map(_.partition()).distinct