This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 658c61922 [flink] Procedure implementation class supports automatic
discovery. (#2108)
658c61922 is described below
commit 658c6192210c93dc681f9230824f9888155ad43d
Author: Kerwin <[email protected]>
AuthorDate: Tue Oct 10 16:29:41 2023 +0800
[flink] Procedure implementation class supports automatic discovery. (#2108)
---
.../flink/procedure/CompactDatabaseProcedure.java | 12 +++---
.../paimon/flink/procedure/CompactProcedure.java | 12 +++---
.../paimon/flink/procedure/CreateTagProcedure.java | 11 ++---
.../paimon/flink/procedure/DeleteTagProcedure.java | 11 ++---
.../flink/procedure/DropPartitionProcedure.java | 11 ++---
.../paimon/flink/procedure/MergeIntoProcedure.java | 12 +++---
.../paimon/flink/procedure/ProcedureBase.java | 8 ++--
.../paimon/flink/procedure/ProcedureUtil.java | 49 +++++++---------------
.../flink/procedure/ResetConsumerProcedure.java | 11 ++---
.../flink/procedure/RollbackToProcedure.java | 11 ++---
.../services/org.apache.paimon.factories.Factory | 10 +++++
11 files changed, 78 insertions(+), 80 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index 10b06931a..3d6189902 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.AbstractCatalog;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.CompactDatabaseAction;
import org.apache.paimon.utils.StringUtils;
@@ -56,11 +55,7 @@ import static
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKe
*/
public class CompactDatabaseProcedure extends ProcedureBase {
- public static final String NAME = "compact_database";
-
- public CompactDatabaseProcedure(Catalog catalog) {
- super(catalog);
- }
+ public static final String IDENTIFIER = "compact_database";
public String[] call(ProcedureContext procedureContext) throws Exception {
return call(procedureContext, "");
@@ -118,4 +113,9 @@ public class CompactDatabaseProcedure extends ProcedureBase
{
return execute(procedureContext, action, "Compact database job");
}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 4ab04dc2b..1ae8fdcf6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.AbstractCatalog;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.CompactAction;
@@ -50,11 +49,7 @@ import java.util.Map;
*/
public class CompactProcedure extends ProcedureBase {
- public static final String NAME = "compact";
-
- public CompactProcedure(Catalog catalog) {
- super(catalog);
- }
+ public static final String IDENTIFIER = "compact";
public String[] call(ProcedureContext procedureContext, String tableId)
throws Exception {
return call(procedureContext, tableId, "");
@@ -123,4 +118,9 @@ public class CompactProcedure extends ProcedureBase {
return execute(procedureContext, action, jobName);
}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index 0a672046d..ac8bcff0f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -33,11 +33,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
*/
public class CreateTagProcedure extends ProcedureBase {
- public static final String NAME = "create_tag";
-
- public CreateTagProcedure(Catalog catalog) {
- super(catalog);
- }
+ public static final String IDENTIFIER = "create_tag";
public String[] call(
ProcedureContext procedureContext, String tableId, String tagName,
long snapshotId)
@@ -47,4 +43,9 @@ public class CreateTagProcedure extends ProcedureBase {
return new String[] {"Success"};
}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
index 83a99e7a3..3991f5019 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
@@ -33,11 +33,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
*/
public class DeleteTagProcedure extends ProcedureBase {
- public static final String NAME = "delete_tag";
-
- public DeleteTagProcedure(Catalog catalog) {
- super(catalog);
- }
+ public static final String IDENTIFIER = "delete_tag";
public String[] call(ProcedureContext procedureContext, String tableId,
String tagName)
throws Catalog.TableNotExistException {
@@ -46,4 +42,9 @@ public class DeleteTagProcedure extends ProcedureBase {
return new String[] {"Success"};
}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index 26d8f2919..c443bc4c2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -39,11 +39,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*/
public class DropPartitionProcedure extends ProcedureBase {
- public static final String NAME = "drop_partition";
-
- public DropPartitionProcedure(Catalog catalog) {
- super(catalog);
- }
+ public static final String IDENTIFIER = "drop_partition";
public String[] call(
ProcedureContext procedureContext, String tableId, String...
partitionStrings)
@@ -58,4 +54,9 @@ public class DropPartitionProcedure extends ProcedureBase {
return new String[] {"Success"};
}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index 6563595e6..6c399fd79 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.AbstractCatalog;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MergeIntoAction;
import org.apache.paimon.flink.action.MergeIntoActionFactory;
@@ -69,11 +68,7 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
*/
public class MergeIntoProcedure extends ProcedureBase {
- public static final String NAME = "merge_into";
-
- public MergeIntoProcedure(Catalog catalog) {
- super(catalog);
- }
+ public static final String IDENTIFIER = "merge_into";
public String[] call(
ProcedureContext procedureContext,
@@ -189,4 +184,9 @@ public class MergeIntoProcedure extends ProcedureBase {
throw new UnsupportedOperationException("Unknown merge action:
" + mergeAction);
}
}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index 1faa6445e..82f869f28 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.factories.Factory;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.utils.StringUtils;
@@ -41,12 +42,13 @@ import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN
import static
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
/** Base implementation for flink {@link Procedure}. */
-public class ProcedureBase implements Procedure {
+public abstract class ProcedureBase implements Procedure, Factory {
- protected final Catalog catalog;
+ protected Catalog catalog;
- ProcedureBase(Catalog catalog) {
+ ProcedureBase withCatalog(Catalog catalog) {
this.catalog = catalog;
+ return this;
}
protected List<Map<String, String>> getPartitions(String...
partitionStrings) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
index 98fbcda5f..1e06a0a5f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -19,10 +19,11 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.factories.FactoryException;
+import org.apache.paimon.factories.FactoryUtil;
import org.apache.flink.table.procedures.Procedure;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -32,43 +33,23 @@ public class ProcedureUtil {
private ProcedureUtil() {}
- private static final List<String> SYSTEM_PROCEDURES = new ArrayList<>();
-
- static {
- SYSTEM_PROCEDURES.add(CompactProcedure.NAME);
- SYSTEM_PROCEDURES.add(CompactDatabaseProcedure.NAME);
- SYSTEM_PROCEDURES.add(CreateTagProcedure.NAME);
- SYSTEM_PROCEDURES.add(DeleteTagProcedure.NAME);
- SYSTEM_PROCEDURES.add(DropPartitionProcedure.NAME);
- SYSTEM_PROCEDURES.add(MergeIntoProcedure.NAME);
- SYSTEM_PROCEDURES.add(ResetConsumerProcedure.NAME);
- SYSTEM_PROCEDURES.add(RollbackToProcedure.NAME);
- }
-
public static List<String> listProcedures() {
- return Collections.unmodifiableList(SYSTEM_PROCEDURES);
+ return Collections.unmodifiableList(
+ FactoryUtil.discoverIdentifiers(
+ ProcedureBase.class.getClassLoader(),
ProcedureBase.class));
}
public static Optional<Procedure> getProcedure(Catalog catalog, String
procedureName) {
- switch (procedureName) {
- case CompactProcedure.NAME:
- return Optional.of(new CompactProcedure(catalog));
- case CompactDatabaseProcedure.NAME:
- return Optional.of(new CompactDatabaseProcedure(catalog));
- case CreateTagProcedure.NAME:
- return Optional.of(new CreateTagProcedure(catalog));
- case DeleteTagProcedure.NAME:
- return Optional.of(new DeleteTagProcedure(catalog));
- case DropPartitionProcedure.NAME:
- return Optional.of(new DropPartitionProcedure(catalog));
- case MergeIntoProcedure.NAME:
- return Optional.of(new MergeIntoProcedure(catalog));
- case ResetConsumerProcedure.NAME:
- return Optional.of(new ResetConsumerProcedure(catalog));
- case RollbackToProcedure.NAME:
- return Optional.of(new RollbackToProcedure(catalog));
- default:
- return Optional.empty();
+ try {
+ ProcedureBase procedure =
+ FactoryUtil.discoverFactory(
+ ProcedureBase.class.getClassLoader(),
+ ProcedureBase.class,
+ procedureName)
+ .withCatalog(catalog);
+ return Optional.of(procedure);
+ } catch (FactoryException e) {
+ return Optional.empty();
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
index 8b6b4ee5b..47c2d4c86 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -39,11 +39,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
*/
public class ResetConsumerProcedure extends ProcedureBase {
- public static final String NAME = "reset_consumer";
-
- public ResetConsumerProcedure(Catalog catalog) {
- super(catalog);
- }
+ public static final String IDENTIFIER = "reset_consumer";
public String[] call(
ProcedureContext procedureContext,
@@ -70,4 +66,9 @@ public class ResetConsumerProcedure extends ProcedureBase {
return new String[] {"Success"};
}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
index a59dd238f..38753bd74 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -37,11 +37,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
*/
public class RollbackToProcedure extends ProcedureBase {
- public static final String NAME = "rollback_to";
-
- public RollbackToProcedure(Catalog catalog) {
- super(catalog);
- }
+ public static final String IDENTIFIER = "rollback_to";
public String[] call(ProcedureContext procedureContext, String tableId,
long snapshotId)
throws Catalog.TableNotExistException {
@@ -58,4 +54,9 @@ public class RollbackToProcedure extends ProcedureBase {
return new String[] {"Success"};
}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 847233822..1196880bf 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -23,3 +23,13 @@ org.apache.paimon.flink.action.RollbackToActionFactory
org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
+
+### procedure factories
+org.apache.paimon.flink.procedure.CompactDatabaseProcedure
+org.apache.paimon.flink.procedure.CompactProcedure
+org.apache.paimon.flink.procedure.CreateTagProcedure
+org.apache.paimon.flink.procedure.DeleteTagProcedure
+org.apache.paimon.flink.procedure.DropPartitionProcedure
+org.apache.paimon.flink.procedure.MergeIntoProcedure
+org.apache.paimon.flink.procedure.ResetConsumerProcedure
+org.apache.paimon.flink.procedure.RollbackToProcedure