This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1a66684c8 [flink] Place all the built-in procedures into sys database
(#2140)
1a66684c8 is described below
commit 1a66684c8fca6733a3fed89a5f4f400e7ef10bd9
Author: yuzelin <[email protected]>
AuthorDate: Tue Oct 17 16:43:01 2023 +0800
[flink] Place all the built-in procedures into sys database (#2140)
---
.../main/java/org/apache/paimon/flink/FlinkCatalog.java | 2 +-
.../paimon/flink/procedure/CompactDatabaseProcedure.java | 12 ++++++------
.../apache/paimon/flink/procedure/CompactProcedure.java | 6 +++---
.../apache/paimon/flink/procedure/CreateTagProcedure.java | 2 +-
.../apache/paimon/flink/procedure/DeleteTagProcedure.java | 2 +-
.../paimon/flink/procedure/DropPartitionProcedure.java | 2 +-
.../apache/paimon/flink/procedure/MergeIntoProcedure.java | 2 +-
.../org/apache/paimon/flink/procedure/ProcedureUtil.java | 8 ++++++--
.../paimon/flink/procedure/ResetConsumerProcedure.java | 4 ++--
.../apache/paimon/flink/procedure/RollbackToProcedure.java | 4 ++--
.../apache/paimon/flink/action/CompactActionITCase.java | 2 +-
.../paimon/flink/action/CompactDatabaseActionITCase.java | 14 +++++++-------
.../apache/paimon/flink/action/ConsumerActionITCase.java | 5 +++--
.../paimon/flink/action/DropPartitionActionITCase.java | 5 +++--
.../apache/paimon/flink/action/MergeIntoActionITCase.java | 12 ++++++------
.../apache/paimon/flink/action/RollbackToActionITCase.java | 13 ++++++++-----
.../action/SortCompactActionForDynamicBucketITCase.java | 2 +-
.../action/SortCompactActionForUnawareBucketITCase.java | 2 +-
.../org/apache/paimon/flink/action/TagActionITCase.java | 5 +++--
19 files changed, 57 insertions(+), 47 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index b14a6d97e..53916ce89 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -897,7 +897,7 @@ public class FlinkCatalog extends AbstractCatalog {
*/
public Procedure getProcedure(ObjectPath procedurePath)
throws ProcedureNotExistException, CatalogException {
- return ProcedureUtil.getProcedure(catalog,
procedurePath.getObjectName())
+ return ProcedureUtil.getProcedure(catalog, procedurePath)
.orElseThrow(() -> new ProcedureNotExistException(name,
procedurePath));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index 3d6189902..a4cd48654 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -35,22 +35,22 @@ import static
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKe
* -- NOTE: use '' as placeholder for optional arguments
*
* -- compact all databases
- * CALL compact_database()
+ * CALL sys.compact_database()
*
* -- compact some databases (accept regular expression)
- * CALL compact_database('includingDatabases')
+ * CALL sys.compact_database('includingDatabases')
*
* -- set compact mode
- * CALL compact_database('includingDatabases', 'mode')
+ * CALL sys.compact_database('includingDatabases', 'mode')
*
* -- compact some tables (accept regular expression)
- * CALL compact_database('includingDatabases', 'mode', 'includingTables')
+ * CALL sys.compact_database('includingDatabases', 'mode', 'includingTables')
*
* -- exclude some tables (accept regular expression)
- * CALL compact_database('includingDatabases', 'mode', 'includingTables',
'excludingTables')
+ * CALL sys.compact_database('includingDatabases', 'mode', 'includingTables',
'excludingTables')
*
* -- set table options ('k=v,...')
- * CALL compact_database('includingDatabases', 'mode', 'includingTables',
'excludingTables', 'tableOptions')
+ * CALL sys.compact_database('includingDatabases', 'mode', 'includingTables',
'excludingTables', 'tableOptions')
* </code></pre>
*/
public class CompactDatabaseProcedure extends ProcedureBase {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 1ae8fdcf6..07bddf1db 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -37,13 +37,13 @@ import java.util.Map;
* -- NOTE: use '' as placeholder for optional arguments
*
* -- compact a table (tableId should be 'database_name.table_name')
- * CALL compact('tableId')
+ * CALL sys.compact('tableId')
*
* -- compact specific partitions ('pt1=A,pt2=a;pt1=B,pt2=b', ...)
- * CALL compact('tableId', 'pt1=A,pt2=a;pt1=B,pt2=b')
+ * CALL sys.compact('tableId', 'pt1=A,pt2=a;pt1=B,pt2=b')
*
* -- compact a table with sorting
- * CALL compact('tableId', 'partitions', 'ORDER/ZORDER', 'col1,col2',
'sink.parallelism=6')
+ * CALL sys.compact('tableId', 'partitions', 'ORDER/ZORDER', 'col1,col2',
'sink.parallelism=6')
*
* </code></pre>
*/
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index ac8bcff0f..9cd4067bc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
* Create tag procedure. Usage:
*
* <pre><code>
- * CALL create_tag('tableId', 'tagName', snapshotId)
+ * CALL sys.create_tag('tableId', 'tagName', snapshotId)
* </code></pre>
*/
public class CreateTagProcedure extends ProcedureBase {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
index 3991f5019..931448937 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
* Delete tag procedure. Usage:
*
* <pre><code>
- * CALL delete_tag('tableId', 'tagName')
+ * CALL sys.delete_tag('tableId', 'tagName')
* </code></pre>
*/
public class DeleteTagProcedure extends ProcedureBase {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index c443bc4c2..e38d04f72 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -34,7 +34,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
* Drop partition procedure. Usage:
*
* <pre><code>
- * CALL drop_partition('tableId', 'partition1', 'partition2', ...)
+ * CALL sys.drop_partition('tableId', 'partition1', 'partition2', ...)
* </code></pre>
*/
public class DropPartitionProcedure extends ProcedureBase {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index 6c399fd79..e2c14e0c3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -42,7 +42,7 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
* Merge Into procedure. Usage:
*
* <pre><code>
- * CALL merge_into(
+ * CALL sys.merge_into(
* 'targetTableId',
* 'targetAlias',
* 'sourceSqls', -- separate with ';'
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
index 1e06a0a5f..a50bee25b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.factories.FactoryException;
import org.apache.paimon.factories.FactoryUtil;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.procedures.Procedure;
import java.util.Collections;
@@ -39,13 +40,16 @@ public class ProcedureUtil {
ProcedureBase.class.getClassLoader(),
ProcedureBase.class));
}
- public static Optional<Procedure> getProcedure(Catalog catalog, String
procedureName) {
+ public static Optional<Procedure> getProcedure(Catalog catalog, ObjectPath
procedurePath) {
+ if
(!Catalog.SYSTEM_DATABASE_NAME.equals(procedurePath.getDatabaseName())) {
+ return Optional.empty();
+ }
try {
ProcedureBase procedure =
FactoryUtil.discoverFactory(
ProcedureBase.class.getClassLoader(),
ProcedureBase.class,
- procedureName)
+ procedurePath.getObjectName())
.withCatalog(catalog);
return Optional.of(procedure);
} catch (FactoryException e) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
index 47c2d4c86..6ff4df5a1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -31,10 +31,10 @@ import org.apache.flink.table.procedure.ProcedureContext;
*
* <pre><code>
* -- reset the new next snapshot id in the consumer
- * CALL reset_consumer('tableId', 'consumerId', nextSnapshotId)
+ * CALL sys.reset_consumer('tableId', 'consumerId', nextSnapshotId)
*
* -- delete consumer
- * CALL reset_consumer('tableId', 'consumerId')
+ * CALL sys.reset_consumer('tableId', 'consumerId')
* </code></pre>
*/
public class ResetConsumerProcedure extends ProcedureBase {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
index 38753bd74..1bf545004 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -29,10 +29,10 @@ import org.apache.flink.table.procedure.ProcedureContext;
*
* <pre><code>
* -- rollback to a snapshot
- * CALL rollback_to('tableId', snapshotId)
+ * CALL sys.rollback_to('tableId', snapshotId)
*
* -- rollback to a tag
- * CALL rollback_to('tableId', 'tagName')
+ * CALL sys.rollback_to('tableId', 'tagName')
* </code></pre>
*/
public class RollbackToProcedure extends ProcedureBase {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 8cd1fb2c5..eaea0e3e9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -301,7 +301,7 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
private void callProcedure(boolean isStreaming) {
callProcedure(
String.format(
- "CALL compact('%s.%s', '%s')",
+ "CALL sys.compact('%s.%s', '%s')",
database, tableName,
"dt=20221208,hh=15;dt=20221209,hh=15"),
isStreaming,
!isStreaming);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 492d98434..4f456a611 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -135,7 +135,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
.build(env);
env.execute();
} else {
- callProcedure(String.format("CALL compact_database('', '%s')",
mode), false, true);
+ callProcedure(String.format("CALL sys.compact_database('', '%s')",
mode), false, true);
}
for (FileStoreTable table : tables) {
@@ -218,10 +218,10 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
env.executeAsync();
} else {
if (mode.equals("divided")) {
- callProcedure("CALL compact_database()", true, false);
+ callProcedure("CALL sys.compact_database()", true, false);
} else {
callProcedure(
- "CALL compact_database('', 'combined', '', '',
'continuous.discovery-interval=1s')",
+ "CALL sys.compact_database('', 'combined', '', '',
'continuous.discovery-interval=1s')",
true,
false);
}
@@ -478,14 +478,14 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
if (mode.equals("divided")) {
callProcedure(
String.format(
- "CALL compact_database('', 'divided', '%s',
'%s')",
+ "CALL sys.compact_database('', 'divided',
'%s', '%s')",
nonNull(includingPattern),
nonNull(excludesPattern)),
false,
true);
} else {
callProcedure(
String.format(
- "CALL compact_database('', 'combined', '%s',
'%s', 'continuous.discovery-interval=1s')",
+ "CALL sys.compact_database('', 'combined',
'%s', '%s', 'continuous.discovery-interval=1s')",
nonNull(includingPattern),
nonNull(excludesPattern)),
false,
true);
@@ -573,7 +573,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
new CompactDatabaseAction(warehouse, new HashMap<>()).build(env);
env.executeAsync();
} else {
- callProcedure("CALL compact_database()");
+ callProcedure("CALL sys.compact_database()");
}
for (FileStoreTable table : tables) {
@@ -640,7 +640,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
new CompactDatabaseAction(warehouse, new HashMap<>()).build(env);
env.execute();
} else {
- callProcedure("CALL compact_database()", false, true);
+ callProcedure("CALL sys.compact_database()", false, true);
}
for (FileStoreTable table : tables) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index c3aee72c5..771a9e59d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -87,7 +87,8 @@ public class ConsumerActionITCase extends ActionITCaseBase {
.run();
} else {
callProcedure(
- String.format("CALL reset_consumer('%s.%s', 'myid', 1)",
database, tableName));
+ String.format(
+ "CALL sys.reset_consumer('%s.%s', 'myid', 1)",
database, tableName));
}
Optional<Consumer> consumer2 = consumerManager.consumer("myid");
assertThat(consumer2).isPresent();
@@ -99,7 +100,7 @@ public class ConsumerActionITCase extends ActionITCaseBase {
.run();
} else {
callProcedure(
- String.format("CALL reset_consumer('%s.%s', 'myid')",
database, tableName));
+ String.format("CALL sys.reset_consumer('%s.%s', 'myid')",
database, tableName));
}
Optional<Consumer> consumer3 = consumerManager.consumer("myid");
assertThat(consumer3).isNotPresent();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index ca22dfcf4..36358a0e0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -66,7 +66,8 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
} else {
callProcedure(
String.format(
- "CALL drop_partition('%s.%s', 'partKey0 = 0')",
database, tableName));
+ "CALL sys.drop_partition('%s.%s', 'partKey0 = 0')",
+ database, tableName));
}
SnapshotManager snapshotManager =
getFileStoreTable(tableName).snapshotManager();
@@ -124,7 +125,7 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
} else {
callProcedure(
String.format(
- "CALL drop_partition('%s.%s',
'partKey0=0,partKey1=1', 'partKey0=1,partKey1=0')",
+ "CALL sys.drop_partition('%s.%s',
'partKey0=0,partKey1=1', 'partKey0=1,partKey1=0')",
database, tableName));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index c3ef32450..53cc1f6b6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -126,7 +126,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
NOT_MATCHED_BY_SOURCE_DELETE);
String procedureStatement =
String.format(
- "CALL merge_into('%s.T', '', '', 'default.S', 'T.k =
S.k AND T.dt = S.dt', '%s', %s)",
+ "CALL sys.merge_into('%s.T', '', '', 'default.S', 'T.k
= S.k AND T.dt = S.dt', '%s', %s)",
database,
mergeActions,
"'T.v <> S.v AND S.v IS NOT NULL', "
@@ -200,7 +200,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
String procedureStatement =
String.format(
- "CALL merge_into('%s.T', 'TT', '', 'S', 'TT.k = S.k
AND TT.dt = S.dt', '%s', %s)",
+ "CALL sys.merge_into('%s.T', 'TT', '', 'S', 'TT.k =
S.k AND TT.dt = S.dt', '%s', %s)",
inDefault ? database : "test_db", MATCHED_DELETE,
"'S.v IS NULL'");
validateActionRunResult(
@@ -242,7 +242,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
String procedureStatement =
String.format(
- "CALL merge_into('default.T', '', '', '%s', 'T.k = S.k
AND T.dt = S.dt', '%s', %s)",
+ "CALL sys.merge_into('default.T', '', '', '%s', 'T.k =
S.k AND T.dt = S.dt', '%s', %s)",
sourceTableName, MATCHED_DELETE, "'S.v IS NULL'");
if (!inDefault) {
@@ -306,7 +306,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
String procedureStatement =
String.format(
- "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = S.k
AND T.dt = S.dt', '%s', %s)",
+ "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k =
S.k AND T.dt = S.dt', '%s', %s)",
database,
useCatalog
? String.format(
@@ -346,7 +346,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
String procedureStatement =
String.format(
- "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k
AND T.dt = SS.dt', '%s', %s)",
+ "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k =
SS.k AND T.dt = SS.dt', '%s', %s)",
database,
"CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'',
dt FROM S",
qualified ? "default.SS" : "SS",
@@ -390,7 +390,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
String procedureStatement =
String.format(
- "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k
AND T.dt = SS.dt', '%s', %s)",
+ "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k =
SS.k AND T.dt = SS.dt', '%s', %s)",
database,
"CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'',
dt FROM S",
qualified ? "default.SS" : "SS",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index e6f6cea6b..6ecac880d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -69,7 +69,7 @@ public class RollbackToActionITCase extends ActionITCaseBase {
if (ThreadLocalRandom.current().nextBoolean()) {
new RollbackToAction(warehouse, database, tableName, "2",
Collections.emptyMap()).run();
} else {
- callProcedure(String.format("CALL rollback_to('%s.%s', 2)",
database, tableName));
+ callProcedure(String.format("CALL sys.rollback_to('%s.%s', 2)",
database, tableName));
}
testBatchRead(
@@ -97,10 +97,13 @@ public class RollbackToActionITCase extends
ActionITCaseBase {
table.createTag("tag2", 2);
table.createTag("tag3", 3);
- RollbackToAction action =
- new RollbackToAction(
- warehouse, database, tableName, "tag2",
Collections.emptyMap());
- action.run();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ new RollbackToAction(warehouse, database, tableName, "tag2",
Collections.emptyMap())
+ .run();
+ } else {
+ callProcedure(
+ String.format("CALL sys.rollback_to('%s.%s', 'tag2')",
database, tableName));
+ }
testBatchRead(
"SELECT * FROM `" + tableName + "`",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index 104962f3a..ff1d72176 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -187,7 +187,7 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
private void callProcedure(String orderStrategy, List<String>
orderByColumns) {
callProcedure(
String.format(
- "CALL compact('%s.%s', 'ALL', '%s', '%s')",
+ "CALL sys.compact('%s.%s', 'ALL', '%s', '%s')",
database, tableName, orderStrategy, String.join(",",
orderByColumns)),
false,
true);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index 00bdbecae..bd989aac9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -264,7 +264,7 @@ public class SortCompactActionForUnawareBucketITCase
extends ActionITCaseBase {
private void callProcedure(String orderStrategy, List<String>
orderByColumns) {
callProcedure(
String.format(
- "CALL compact('%s.%s', 'ALL', '%s', '%s')",
+ "CALL sys.compact('%s.%s', 'ALL', '%s', '%s')",
database, tableName, orderStrategy, String.join(",",
orderByColumns)),
false,
true);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index 4f44d672f..9691e66fe 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -71,7 +71,7 @@ public class TagActionITCase extends ActionITCaseBase {
.run();
} else {
callProcedure(
- String.format("CALL create_tag('%s.%s', 'tag2', 2)",
database, tableName));
+ String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)",
database, tableName));
}
assertThat(tagManager.tagExists("tag2")).isTrue();
@@ -84,7 +84,8 @@ public class TagActionITCase extends ActionITCaseBase {
new DeleteTagAction(warehouse, database, tableName,
Collections.emptyMap(), "tag2")
.run();
} else {
- callProcedure(String.format("CALL delete_tag('%s.%s', 'tag2')",
database, tableName));
+ callProcedure(
+ String.format("CALL sys.delete_tag('%s.%s', 'tag2')",
database, tableName));
}
assertThat(tagManager.tagExists("tag2")).isFalse();
}