This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a66684c8 [flink] Place all the built-in procedures into sys database 
(#2140)
1a66684c8 is described below

commit 1a66684c8fca6733a3fed89a5f4f400e7ef10bd9
Author: yuzelin <[email protected]>
AuthorDate: Tue Oct 17 16:43:01 2023 +0800

    [flink] Place all the built-in procedures into sys database (#2140)
---
 .../main/java/org/apache/paimon/flink/FlinkCatalog.java    |  2 +-
 .../paimon/flink/procedure/CompactDatabaseProcedure.java   | 12 ++++++------
 .../apache/paimon/flink/procedure/CompactProcedure.java    |  6 +++---
 .../apache/paimon/flink/procedure/CreateTagProcedure.java  |  2 +-
 .../apache/paimon/flink/procedure/DeleteTagProcedure.java  |  2 +-
 .../paimon/flink/procedure/DropPartitionProcedure.java     |  2 +-
 .../apache/paimon/flink/procedure/MergeIntoProcedure.java  |  2 +-
 .../org/apache/paimon/flink/procedure/ProcedureUtil.java   |  8 ++++++--
 .../paimon/flink/procedure/ResetConsumerProcedure.java     |  4 ++--
 .../apache/paimon/flink/procedure/RollbackToProcedure.java |  4 ++--
 .../apache/paimon/flink/action/CompactActionITCase.java    |  2 +-
 .../paimon/flink/action/CompactDatabaseActionITCase.java   | 14 +++++++-------
 .../apache/paimon/flink/action/ConsumerActionITCase.java   |  5 +++--
 .../paimon/flink/action/DropPartitionActionITCase.java     |  5 +++--
 .../apache/paimon/flink/action/MergeIntoActionITCase.java  | 12 ++++++------
 .../apache/paimon/flink/action/RollbackToActionITCase.java | 13 ++++++++-----
 .../action/SortCompactActionForDynamicBucketITCase.java    |  2 +-
 .../action/SortCompactActionForUnawareBucketITCase.java    |  2 +-
 .../org/apache/paimon/flink/action/TagActionITCase.java    |  5 +++--
 19 files changed, 57 insertions(+), 47 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index b14a6d97e..53916ce89 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -897,7 +897,7 @@ public class FlinkCatalog extends AbstractCatalog {
      */
     public Procedure getProcedure(ObjectPath procedurePath)
             throws ProcedureNotExistException, CatalogException {
-        return ProcedureUtil.getProcedure(catalog, 
procedurePath.getObjectName())
+        return ProcedureUtil.getProcedure(catalog, procedurePath)
                 .orElseThrow(() -> new ProcedureNotExistException(name, 
procedurePath));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index 3d6189902..a4cd48654 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -35,22 +35,22 @@ import static 
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKe
  *  -- NOTE: use '' as placeholder for optional arguments
  *
  *  -- compact all databases
- *  CALL compact_database()
+ *  CALL sys.compact_database()
  *
  *  -- compact some databases (accept regular expression)
- *  CALL compact_database('includingDatabases')
+ *  CALL sys.compact_database('includingDatabases')
  *
  *  -- set compact mode
- *  CALL compact_database('includingDatabases', 'mode')
+ *  CALL sys.compact_database('includingDatabases', 'mode')
  *
  *  -- compact some tables (accept regular expression)
- *  CALL compact_database('includingDatabases', 'mode', 'includingTables')
+ *  CALL sys.compact_database('includingDatabases', 'mode', 'includingTables')
  *
  *  -- exclude some tables (accept regular expression)
- *  CALL compact_database('includingDatabases', 'mode', 'includingTables', 
'excludingTables')
+ *  CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 
'excludingTables')
  *
  *  -- set table options ('k=v,...')
- *  CALL compact_database('includingDatabases', 'mode', 'includingTables', 
'excludingTables', 'tableOptions')
+ *  CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 
'excludingTables', 'tableOptions')
  * </code></pre>
  */
 public class CompactDatabaseProcedure extends ProcedureBase {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 1ae8fdcf6..07bddf1db 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -37,13 +37,13 @@ import java.util.Map;
  *  -- NOTE: use '' as placeholder for optional arguments
  *
  *  -- compact a table (tableId should be 'database_name.table_name')
- *  CALL compact('tableId')
+ *  CALL sys.compact('tableId')
  *
  *  -- compact specific partitions ('pt1=A,pt2=a;pt1=B,pt2=b', ...)
- *  CALL compact('tableId', 'pt1=A,pt2=a;pt1=B,pt2=b')
+ *  CALL sys.compact('tableId', 'pt1=A,pt2=a;pt1=B,pt2=b')
  *
  *  -- compact a table with sorting
- *  CALL compact('tableId', 'partitions', 'ORDER/ZORDER', 'col1,col2', 
'sink.parallelism=6')
+ *  CALL sys.compact('tableId', 'partitions', 'ORDER/ZORDER', 'col1,col2', 
'sink.parallelism=6')
  *
  * </code></pre>
  */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index ac8bcff0f..9cd4067bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
  * Create tag procedure. Usage:
  *
  * <pre><code>
- *  CALL create_tag('tableId', 'tagName', snapshotId)
+ *  CALL sys.create_tag('tableId', 'tagName', snapshotId)
  * </code></pre>
  */
 public class CreateTagProcedure extends ProcedureBase {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
index 3991f5019..931448937 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
  * Delete tag procedure. Usage:
  *
  * <pre><code>
- *  CALL delete_tag('tableId', 'tagName')
+ *  CALL sys.delete_tag('tableId', 'tagName')
  * </code></pre>
  */
 public class DeleteTagProcedure extends ProcedureBase {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index c443bc4c2..e38d04f72 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -34,7 +34,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  * Drop partition procedure. Usage:
  *
  * <pre><code>
- *  CALL drop_partition('tableId', 'partition1', 'partition2', ...)
+ *  CALL sys.drop_partition('tableId', 'partition1', 'partition2', ...)
  * </code></pre>
  */
 public class DropPartitionProcedure extends ProcedureBase {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index 6c399fd79..e2c14e0c3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -42,7 +42,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
  * Merge Into procedure. Usage:
  *
  * <pre><code>
- *  CALL merge_into(
+ *  CALL sys.merge_into(
  *      'targetTableId',
  *      'targetAlias',
  *      'sourceSqls', -- separate with ';'
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
index 1e06a0a5f..a50bee25b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.factories.FactoryException;
 import org.apache.paimon.factories.FactoryUtil;
 
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.procedures.Procedure;
 
 import java.util.Collections;
@@ -39,13 +40,16 @@ public class ProcedureUtil {
                         ProcedureBase.class.getClassLoader(), 
ProcedureBase.class));
     }
 
-    public static Optional<Procedure> getProcedure(Catalog catalog, String 
procedureName) {
+    public static Optional<Procedure> getProcedure(Catalog catalog, ObjectPath 
procedurePath) {
+        if 
(!Catalog.SYSTEM_DATABASE_NAME.equals(procedurePath.getDatabaseName())) {
+            return Optional.empty();
+        }
         try {
             ProcedureBase procedure =
                     FactoryUtil.discoverFactory(
                                     ProcedureBase.class.getClassLoader(),
                                     ProcedureBase.class,
-                                    procedureName)
+                                    procedurePath.getObjectName())
                             .withCatalog(catalog);
             return Optional.of(procedure);
         } catch (FactoryException e) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
index 47c2d4c86..6ff4df5a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -31,10 +31,10 @@ import org.apache.flink.table.procedure.ProcedureContext;
  *
  * <pre><code>
  *  -- reset the new next snapshot id in the consumer
- *  CALL reset_consumer('tableId', 'consumerId', nextSnapshotId)
+ *  CALL sys.reset_consumer('tableId', 'consumerId', nextSnapshotId)
  *
  *  -- delete consumer
- *  CALL reset_consumer('tableId', 'consumerId')
+ *  CALL sys.reset_consumer('tableId', 'consumerId')
  * </code></pre>
  */
 public class ResetConsumerProcedure extends ProcedureBase {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
index 38753bd74..1bf545004 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -29,10 +29,10 @@ import org.apache.flink.table.procedure.ProcedureContext;
  *
  * <pre><code>
  *  -- rollback to a snapshot
- *  CALL rollback_to('tableId', snapshotId)
+ *  CALL sys.rollback_to('tableId', snapshotId)
  *
  *  -- rollback to a tag
- *  CALL rollback_to('tableId', 'tagName')
+ *  CALL sys.rollback_to('tableId', 'tagName')
  * </code></pre>
  */
 public class RollbackToProcedure extends ProcedureBase {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 8cd1fb2c5..eaea0e3e9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -301,7 +301,7 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
     private void callProcedure(boolean isStreaming) {
         callProcedure(
                 String.format(
-                        "CALL compact('%s.%s', '%s')",
+                        "CALL sys.compact('%s.%s', '%s')",
                         database, tableName, 
"dt=20221208,hh=15;dt=20221209,hh=15"),
                 isStreaming,
                 !isStreaming);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 492d98434..4f456a611 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -135,7 +135,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                     .build(env);
             env.execute();
         } else {
-            callProcedure(String.format("CALL compact_database('', '%s')", 
mode), false, true);
+            callProcedure(String.format("CALL sys.compact_database('', '%s')", 
mode), false, true);
         }
 
         for (FileStoreTable table : tables) {
@@ -218,10 +218,10 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             env.executeAsync();
         } else {
             if (mode.equals("divided")) {
-                callProcedure("CALL compact_database()", true, false);
+                callProcedure("CALL sys.compact_database()", true, false);
             } else {
                 callProcedure(
-                        "CALL compact_database('', 'combined', '', '', 
'continuous.discovery-interval=1s')",
+                        "CALL sys.compact_database('', 'combined', '', '', 
'continuous.discovery-interval=1s')",
                         true,
                         false);
             }
@@ -478,14 +478,14 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             if (mode.equals("divided")) {
                 callProcedure(
                         String.format(
-                                "CALL compact_database('', 'divided', '%s', 
'%s')",
+                                "CALL sys.compact_database('', 'divided', 
'%s', '%s')",
                                 nonNull(includingPattern), 
nonNull(excludesPattern)),
                         false,
                         true);
             } else {
                 callProcedure(
                         String.format(
-                                "CALL compact_database('', 'combined', '%s', 
'%s', 'continuous.discovery-interval=1s')",
+                                "CALL sys.compact_database('', 'combined', 
'%s', '%s', 'continuous.discovery-interval=1s')",
                                 nonNull(includingPattern), 
nonNull(excludesPattern)),
                         false,
                         true);
@@ -573,7 +573,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             new CompactDatabaseAction(warehouse, new HashMap<>()).build(env);
             env.executeAsync();
         } else {
-            callProcedure("CALL compact_database()");
+            callProcedure("CALL sys.compact_database()");
         }
 
         for (FileStoreTable table : tables) {
@@ -640,7 +640,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             new CompactDatabaseAction(warehouse, new HashMap<>()).build(env);
             env.execute();
         } else {
-            callProcedure("CALL compact_database()", false, true);
+            callProcedure("CALL sys.compact_database()", false, true);
         }
 
         for (FileStoreTable table : tables) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index c3aee72c5..771a9e59d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -87,7 +87,8 @@ public class ConsumerActionITCase extends ActionITCaseBase {
                     .run();
         } else {
             callProcedure(
-                    String.format("CALL reset_consumer('%s.%s', 'myid', 1)", 
database, tableName));
+                    String.format(
+                            "CALL sys.reset_consumer('%s.%s', 'myid', 1)", 
database, tableName));
         }
         Optional<Consumer> consumer2 = consumerManager.consumer("myid");
         assertThat(consumer2).isPresent();
@@ -99,7 +100,7 @@ public class ConsumerActionITCase extends ActionITCaseBase {
                     .run();
         } else {
             callProcedure(
-                    String.format("CALL reset_consumer('%s.%s', 'myid')", 
database, tableName));
+                    String.format("CALL sys.reset_consumer('%s.%s', 'myid')", 
database, tableName));
         }
         Optional<Consumer> consumer3 = consumerManager.consumer("myid");
         assertThat(consumer3).isNotPresent();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index ca22dfcf4..36358a0e0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -66,7 +66,8 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
         } else {
             callProcedure(
                     String.format(
-                            "CALL drop_partition('%s.%s', 'partKey0 = 0')", 
database, tableName));
+                            "CALL sys.drop_partition('%s.%s', 'partKey0 = 0')",
+                            database, tableName));
         }
 
         SnapshotManager snapshotManager = 
getFileStoreTable(tableName).snapshotManager();
@@ -124,7 +125,7 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
         } else {
             callProcedure(
                     String.format(
-                            "CALL drop_partition('%s.%s', 
'partKey0=0,partKey1=1', 'partKey0=1,partKey1=0')",
+                            "CALL sys.drop_partition('%s.%s', 
'partKey0=0,partKey1=1', 'partKey0=1,partKey1=0')",
                             database, tableName));
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index c3ef32450..53cc1f6b6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -126,7 +126,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
                         NOT_MATCHED_BY_SOURCE_DELETE);
         String procedureStatement =
                 String.format(
-                        "CALL merge_into('%s.T', '', '', 'default.S', 'T.k = 
S.k AND T.dt = S.dt', '%s', %s)",
+                        "CALL sys.merge_into('%s.T', '', '', 'default.S', 'T.k 
= S.k AND T.dt = S.dt', '%s', %s)",
                         database,
                         mergeActions,
                         "'T.v <> S.v AND S.v IS NOT NULL', "
@@ -200,7 +200,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
 
         String procedureStatement =
                 String.format(
-                        "CALL merge_into('%s.T', 'TT', '', 'S', 'TT.k = S.k 
AND TT.dt = S.dt', '%s', %s)",
+                        "CALL sys.merge_into('%s.T', 'TT', '', 'S', 'TT.k = 
S.k AND TT.dt = S.dt', '%s', %s)",
                         inDefault ? database : "test_db", MATCHED_DELETE, 
"'S.v IS NULL'");
 
         validateActionRunResult(
@@ -242,7 +242,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
 
         String procedureStatement =
                 String.format(
-                        "CALL merge_into('default.T', '', '', '%s', 'T.k = S.k 
AND T.dt = S.dt', '%s', %s)",
+                        "CALL sys.merge_into('default.T', '', '', '%s', 'T.k = 
S.k AND T.dt = S.dt', '%s', %s)",
                         sourceTableName, MATCHED_DELETE, "'S.v IS NULL'");
 
         if (!inDefault) {
@@ -306,7 +306,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
 
         String procedureStatement =
                 String.format(
-                        "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = S.k 
AND T.dt = S.dt', '%s', %s)",
+                        "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = 
S.k AND T.dt = S.dt', '%s', %s)",
                         database,
                         useCatalog
                                 ? String.format(
@@ -346,7 +346,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
 
         String procedureStatement =
                 String.format(
-                        "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k 
AND T.dt = SS.dt', '%s', %s)",
+                        "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = 
SS.k AND T.dt = SS.dt', '%s', %s)",
                         database,
                         "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', 
dt FROM S",
                         qualified ? "default.SS" : "SS",
@@ -390,7 +390,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
 
         String procedureStatement =
                 String.format(
-                        "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k 
AND T.dt = SS.dt', '%s', %s)",
+                        "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = 
SS.k AND T.dt = SS.dt', '%s', %s)",
                         database,
                         "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', 
dt FROM S",
                         qualified ? "default.SS" : "SS",
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index e6f6cea6b..6ecac880d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -69,7 +69,7 @@ public class RollbackToActionITCase extends ActionITCaseBase {
         if (ThreadLocalRandom.current().nextBoolean()) {
             new RollbackToAction(warehouse, database, tableName, "2", 
Collections.emptyMap()).run();
         } else {
-            callProcedure(String.format("CALL rollback_to('%s.%s', 2)", 
database, tableName));
+            callProcedure(String.format("CALL sys.rollback_to('%s.%s', 2)", 
database, tableName));
         }
 
         testBatchRead(
@@ -97,10 +97,13 @@ public class RollbackToActionITCase extends 
ActionITCaseBase {
         table.createTag("tag2", 2);
         table.createTag("tag3", 3);
 
-        RollbackToAction action =
-                new RollbackToAction(
-                        warehouse, database, tableName, "tag2", 
Collections.emptyMap());
-        action.run();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            new RollbackToAction(warehouse, database, tableName, "tag2", 
Collections.emptyMap())
+                    .run();
+        } else {
+            callProcedure(
+                    String.format("CALL sys.rollback_to('%s.%s', 'tag2')", 
database, tableName));
+        }
 
         testBatchRead(
                 "SELECT * FROM `" + tableName + "`",
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index 104962f3a..ff1d72176 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -187,7 +187,7 @@ public class SortCompactActionForDynamicBucketITCase 
extends ActionITCaseBase {
     private void callProcedure(String orderStrategy, List<String> 
orderByColumns) {
         callProcedure(
                 String.format(
-                        "CALL compact('%s.%s', 'ALL', '%s', '%s')",
+                        "CALL sys.compact('%s.%s', 'ALL', '%s', '%s')",
                         database, tableName, orderStrategy, String.join(",", 
orderByColumns)),
                 false,
                 true);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index 00bdbecae..bd989aac9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -264,7 +264,7 @@ public class SortCompactActionForUnawareBucketITCase 
extends ActionITCaseBase {
     private void callProcedure(String orderStrategy, List<String> 
orderByColumns) {
         callProcedure(
                 String.format(
-                        "CALL compact('%s.%s', 'ALL', '%s', '%s')",
+                        "CALL sys.compact('%s.%s', 'ALL', '%s', '%s')",
                         database, tableName, orderStrategy, String.join(",", 
orderByColumns)),
                 false,
                 true);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index 4f44d672f..9691e66fe 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -71,7 +71,7 @@ public class TagActionITCase extends ActionITCaseBase {
                     .run();
         } else {
             callProcedure(
-                    String.format("CALL create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
+                    String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
         }
         assertThat(tagManager.tagExists("tag2")).isTrue();
 
@@ -84,7 +84,8 @@ public class TagActionITCase extends ActionITCaseBase {
             new DeleteTagAction(warehouse, database, tableName, 
Collections.emptyMap(), "tag2")
                     .run();
         } else {
-            callProcedure(String.format("CALL delete_tag('%s.%s', 'tag2')", 
database, tableName));
+            callProcedure(
+                    String.format("CALL sys.delete_tag('%s.%s', 'tag2')", 
database, tableName));
         }
         assertThat(tagManager.tagExists("tag2")).isFalse();
     }

Reply via email to