This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 658c61922 [flink] Procedure implementation class supports automatic 
discovery. (#2108)
658c61922 is described below

commit 658c6192210c93dc681f9230824f9888155ad43d
Author: Kerwin <[email protected]>
AuthorDate: Tue Oct 10 16:29:41 2023 +0800

    [flink] Procedure implementation class supports automatic discovery. (#2108)
---
 .../flink/procedure/CompactDatabaseProcedure.java  | 12 +++---
 .../paimon/flink/procedure/CompactProcedure.java   | 12 +++---
 .../paimon/flink/procedure/CreateTagProcedure.java | 11 ++---
 .../paimon/flink/procedure/DeleteTagProcedure.java | 11 ++---
 .../flink/procedure/DropPartitionProcedure.java    | 11 ++---
 .../paimon/flink/procedure/MergeIntoProcedure.java | 12 +++---
 .../paimon/flink/procedure/ProcedureBase.java      |  8 ++--
 .../paimon/flink/procedure/ProcedureUtil.java      | 49 +++++++---------------
 .../flink/procedure/ResetConsumerProcedure.java    | 11 ++---
 .../flink/procedure/RollbackToProcedure.java       | 11 ++---
 .../services/org.apache.paimon.factories.Factory   | 10 +++++
 11 files changed, 78 insertions(+), 80 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index 10b06931a..3d6189902 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.AbstractCatalog;
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.action.CompactDatabaseAction;
 import org.apache.paimon.utils.StringUtils;
 
@@ -56,11 +55,7 @@ import static 
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKe
  */
 public class CompactDatabaseProcedure extends ProcedureBase {
 
-    public static final String NAME = "compact_database";
-
-    public CompactDatabaseProcedure(Catalog catalog) {
-        super(catalog);
-    }
+    public static final String IDENTIFIER = "compact_database";
 
     public String[] call(ProcedureContext procedureContext) throws Exception {
         return call(procedureContext, "");
@@ -118,4 +113,9 @@ public class CompactDatabaseProcedure extends ProcedureBase 
{
 
         return execute(procedureContext, action, "Compact database job");
     }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 4ab04dc2b..1ae8fdcf6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.AbstractCatalog;
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.ActionFactory;
 import org.apache.paimon.flink.action.CompactAction;
@@ -50,11 +49,7 @@ import java.util.Map;
  */
 public class CompactProcedure extends ProcedureBase {
 
-    public static final String NAME = "compact";
-
-    public CompactProcedure(Catalog catalog) {
-        super(catalog);
-    }
+    public static final String IDENTIFIER = "compact";
 
     public String[] call(ProcedureContext procedureContext, String tableId) 
throws Exception {
         return call(procedureContext, tableId, "");
@@ -123,4 +118,9 @@ public class CompactProcedure extends ProcedureBase {
 
         return execute(procedureContext, action, jobName);
     }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index 0a672046d..ac8bcff0f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -33,11 +33,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
  */
 public class CreateTagProcedure extends ProcedureBase {
 
-    public static final String NAME = "create_tag";
-
-    public CreateTagProcedure(Catalog catalog) {
-        super(catalog);
-    }
+    public static final String IDENTIFIER = "create_tag";
 
     public String[] call(
             ProcedureContext procedureContext, String tableId, String tagName, 
long snapshotId)
@@ -47,4 +43,9 @@ public class CreateTagProcedure extends ProcedureBase {
 
         return new String[] {"Success"};
     }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
index 83a99e7a3..3991f5019 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
@@ -33,11 +33,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
  */
 public class DeleteTagProcedure extends ProcedureBase {
 
-    public static final String NAME = "delete_tag";
-
-    public DeleteTagProcedure(Catalog catalog) {
-        super(catalog);
-    }
+    public static final String IDENTIFIER = "delete_tag";
 
     public String[] call(ProcedureContext procedureContext, String tableId, 
String tagName)
             throws Catalog.TableNotExistException {
@@ -46,4 +42,9 @@ public class DeleteTagProcedure extends ProcedureBase {
 
         return new String[] {"Success"};
     }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index 26d8f2919..c443bc4c2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -39,11 +39,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  */
 public class DropPartitionProcedure extends ProcedureBase {
 
-    public static final String NAME = "drop_partition";
-
-    public DropPartitionProcedure(Catalog catalog) {
-        super(catalog);
-    }
+    public static final String IDENTIFIER = "drop_partition";
 
     public String[] call(
             ProcedureContext procedureContext, String tableId, String... 
partitionStrings)
@@ -58,4 +54,9 @@ public class DropPartitionProcedure extends ProcedureBase {
 
         return new String[] {"Success"};
     }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index 6563595e6..6c399fd79 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.AbstractCatalog;
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.MergeIntoAction;
 import org.apache.paimon.flink.action.MergeIntoActionFactory;
@@ -69,11 +68,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
  */
 public class MergeIntoProcedure extends ProcedureBase {
 
-    public static final String NAME = "merge_into";
-
-    public MergeIntoProcedure(Catalog catalog) {
-        super(catalog);
-    }
+    public static final String IDENTIFIER = "merge_into";
 
     public String[] call(
             ProcedureContext procedureContext,
@@ -189,4 +184,9 @@ public class MergeIntoProcedure extends ProcedureBase {
                 throw new UnsupportedOperationException("Unknown merge action: 
" + mergeAction);
         }
     }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index 1faa6445e..82f869f28 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.factories.Factory;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.utils.StringUtils;
@@ -41,12 +42,13 @@ import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN
 import static 
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
 
 /** Base implementation for flink {@link Procedure}. */
-public class ProcedureBase implements Procedure {
+public abstract class ProcedureBase implements Procedure, Factory {
 
-    protected final Catalog catalog;
+    protected Catalog catalog;
 
-    ProcedureBase(Catalog catalog) {
+    ProcedureBase withCatalog(Catalog catalog) {
         this.catalog = catalog;
+        return this;
     }
 
     protected List<Map<String, String>> getPartitions(String... 
partitionStrings) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
index 98fbcda5f..1e06a0a5f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -19,10 +19,11 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.factories.FactoryException;
+import org.apache.paimon.factories.FactoryUtil;
 
 import org.apache.flink.table.procedures.Procedure;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -32,43 +33,23 @@ public class ProcedureUtil {
 
     private ProcedureUtil() {}
 
-    private static final List<String> SYSTEM_PROCEDURES = new ArrayList<>();
-
-    static {
-        SYSTEM_PROCEDURES.add(CompactProcedure.NAME);
-        SYSTEM_PROCEDURES.add(CompactDatabaseProcedure.NAME);
-        SYSTEM_PROCEDURES.add(CreateTagProcedure.NAME);
-        SYSTEM_PROCEDURES.add(DeleteTagProcedure.NAME);
-        SYSTEM_PROCEDURES.add(DropPartitionProcedure.NAME);
-        SYSTEM_PROCEDURES.add(MergeIntoProcedure.NAME);
-        SYSTEM_PROCEDURES.add(ResetConsumerProcedure.NAME);
-        SYSTEM_PROCEDURES.add(RollbackToProcedure.NAME);
-    }
-
     public static List<String> listProcedures() {
-        return Collections.unmodifiableList(SYSTEM_PROCEDURES);
+        return Collections.unmodifiableList(
+                FactoryUtil.discoverIdentifiers(
+                        ProcedureBase.class.getClassLoader(), 
ProcedureBase.class));
     }
 
     public static Optional<Procedure> getProcedure(Catalog catalog, String 
procedureName) {
-        switch (procedureName) {
-            case CompactProcedure.NAME:
-                return Optional.of(new CompactProcedure(catalog));
-            case CompactDatabaseProcedure.NAME:
-                return Optional.of(new CompactDatabaseProcedure(catalog));
-            case CreateTagProcedure.NAME:
-                return Optional.of(new CreateTagProcedure(catalog));
-            case DeleteTagProcedure.NAME:
-                return Optional.of(new DeleteTagProcedure(catalog));
-            case DropPartitionProcedure.NAME:
-                return Optional.of(new DropPartitionProcedure(catalog));
-            case MergeIntoProcedure.NAME:
-                return Optional.of(new MergeIntoProcedure(catalog));
-            case ResetConsumerProcedure.NAME:
-                return Optional.of(new ResetConsumerProcedure(catalog));
-            case RollbackToProcedure.NAME:
-                return Optional.of(new RollbackToProcedure(catalog));
-            default:
-                return Optional.empty();
+        try {
+            ProcedureBase procedure =
+                    FactoryUtil.discoverFactory(
+                                    ProcedureBase.class.getClassLoader(),
+                                    ProcedureBase.class,
+                                    procedureName)
+                            .withCatalog(catalog);
+            return Optional.of(procedure);
+        } catch (FactoryException e) {
+            return Optional.empty();
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
index 8b6b4ee5b..47c2d4c86 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -39,11 +39,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
  */
 public class ResetConsumerProcedure extends ProcedureBase {
 
-    public static final String NAME = "reset_consumer";
-
-    public ResetConsumerProcedure(Catalog catalog) {
-        super(catalog);
-    }
+    public static final String IDENTIFIER = "reset_consumer";
 
     public String[] call(
             ProcedureContext procedureContext,
@@ -70,4 +66,9 @@ public class ResetConsumerProcedure extends ProcedureBase {
 
         return new String[] {"Success"};
     }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
index a59dd238f..38753bd74 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -37,11 +37,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
  */
 public class RollbackToProcedure extends ProcedureBase {
 
-    public static final String NAME = "rollback_to";
-
-    public RollbackToProcedure(Catalog catalog) {
-        super(catalog);
-    }
+    public static final String IDENTIFIER = "rollback_to";
 
     public String[] call(ProcedureContext procedureContext, String tableId, 
long snapshotId)
             throws Catalog.TableNotExistException {
@@ -58,4 +54,9 @@ public class RollbackToProcedure extends ProcedureBase {
 
         return new String[] {"Success"};
     }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 847233822..1196880bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -23,3 +23,13 @@ org.apache.paimon.flink.action.RollbackToActionFactory
 org.apache.paimon.flink.action.CreateTagActionFactory
 org.apache.paimon.flink.action.DeleteTagActionFactory
 org.apache.paimon.flink.action.ResetConsumerActionFactory
+
+### procedure factories
+org.apache.paimon.flink.procedure.CompactDatabaseProcedure
+org.apache.paimon.flink.procedure.CompactProcedure
+org.apache.paimon.flink.procedure.CreateTagProcedure
+org.apache.paimon.flink.procedure.DeleteTagProcedure
+org.apache.paimon.flink.procedure.DropPartitionProcedure
+org.apache.paimon.flink.procedure.MergeIntoProcedure
+org.apache.paimon.flink.procedure.ResetConsumerProcedure
+org.apache.paimon.flink.procedure.RollbackToProcedure

Reply via email to