This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 893dbcf53 [flink] Support flink migrate hive database (#2827)
893dbcf53 is described below

commit 893dbcf5365b8fbdb4778a9a30e191b14287d9c0
Author: TaoZex <[email protected]>
AuthorDate: Thu Feb 22 10:47:28 2024 +0800

    [flink] Support flink migrate hive database (#2827)
---
 docs/content/migration/migration-from-hive.md      |  49 ++++++-
 paimon-flink/paimon-flink-common/pom.xml           |  37 ++++++
 .../paimon/flink/action/MigrateDatabaseAction.java |  52 ++++++++
 .../flink/action/MigrateDatabaseActionFactory.java |  64 +++++++++
 .../flink/procedure/MigrateDatabaseProcedure.java  |  84 ++++++++++++
 .../services/org.apache.paimon.factories.Factory   |   2 +
 ...se.java => MigrateDatabaseProcedureITCase.java} | 148 ++++++++++++++-------
 .../procedure/MigrateTableProcedureITCase.java     |   4 +-
 8 files changed, 389 insertions(+), 51 deletions(-)

diff --git a/docs/content/migration/migration-from-hive.md 
b/docs/content/migration/migration-from-hive.md
index 5bb86c79d..92b9e4c64 100644
--- a/docs/content/migration/migration-from-hive.md
+++ b/docs/content/migration/migration-from-hive.md
@@ -31,11 +31,13 @@ When migrating data to a paimon table, the origin table 
will be permanently disa
 still need the original table. The migrated table will be [unaware-bucket 
append table]({{< ref "concepts/append-table/append-scalable-table" >}}).
 
 Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate 
File Procedure to totally migrate a table from hive to paimon.
+At the same time, you can use paimon hive catalog with Migrate Database 
Procedure to fully synchronize all tables in the database to paimon.
 
 * Migrate Table Procedure: Paimon table does not exist, use the procedure 
upgrade hive table to paimon table. Hive table will disappear after action done.
+* Migrate Database Procedure: Paimon table does not exist, use the procedure 
upgrade all hive tables in database to paimon table. All hive tables will 
disappear after action done.
 * Migrate File Procedure:  Paimon table already exists, use the procedure to 
migrate files from hive table to paimon table. **Notice that, Hive table will 
also disappear after action done.**
 
-These two actions now support file format of hive "orc" and "parquet" and 
"avro".
+These three actions now support file format of hive "orc" and "parquet" and 
"avro".
 
 <span style="color: red; "> **We highly recommend to back up hive table data 
before migrating, because migrating action is not atomic. If been interrupted 
while migrating, you may lose your data.** </span>
 
@@ -86,6 +88,51 @@ Example:
 --table default.hive_or_paimon \
 ```
 
+**Migrate Hive Database**
+
+Command: <br>
+
+***CALL <font color="green">sys.migrate_database</font>(&#39;hive&#39;, 
&#39;&lt;hive_database&gt;&#39;, &#39;&lt;paimon_tableconf&gt;&#39;);***
+
+**Example**
+
+```sql
+CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+
+USE CATALOG PAIMON;
+
+CALL sys.migrate_database('hive', 'default', 'file.format=orc');
+```
+After invoke, all tables in "default" database will totally convert to paimon 
format. Writing and reading the table by old "hive way" will fail.
+We can add our table properties while importing by 
sys.migrate_database('<database>', '<tableproperties>').
+<tableproperties> here should be separated by ",".  For example:
+
+```sql
+CALL sys.migrate_database('hive', 'my_db', 
'file.format=orc,read.batch-size=2096,write-only=true')
+```
+
+If your flink version is below 1.17, you can use flink action to achieve this:
+```bash
+<FLINK_HOME>/bin/flink run \
+/path/to/paimon-flink-action-{{< version >}}.jar \
+migrate_databse
+--warehouse <warehouse-path> \
+--source_type hive \
+--database <database> \
+[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> 
...]] \
+[--options <paimon-table-conf  [,paimon-table-conf ...]> ]
+```
+
+Example:
+```bash
+<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar migrate_table 
\
+--warehouse /path/to/warehouse \
+--catalog_conf uri=thrift://localhost:9083 \
+--catalog_conf metastore=hive \
+--source_type hive \
+--database default \
+```
+
 **Migrate Hive File**
 
 Command: <br>
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 4fd85a7de..d486aaa42 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -67,6 +67,43 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+            <version>${hive.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.pentaho</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java
new file mode 100644
index 000000000..40c193f8a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.MigrateDatabaseProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Migrate from external all hive table in database to paimon table. */
+public class MigrateDatabaseAction extends ActionBase {
+    private final String connector;
+    private final String hiveDatabaseName;
+    private final String tableProperties;
+
+    public MigrateDatabaseAction(
+            String connector,
+            String warehouse,
+            String hiveDatabaseName,
+            Map<String, String> catalogConfig,
+            String tableProperties) {
+        super(warehouse, catalogConfig);
+        this.connector = connector;
+        this.hiveDatabaseName = hiveDatabaseName;
+        this.tableProperties = tableProperties;
+    }
+
+    @Override
+    public void run() throws Exception {
+        MigrateDatabaseProcedure migrateDatabaseProcedure = new 
MigrateDatabaseProcedure();
+        migrateDatabaseProcedure.withCatalog(catalog);
+        migrateDatabaseProcedure.call(
+                new DefaultProcedureContext(env), connector, hiveDatabaseName, 
tableProperties);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java
new file mode 100644
index 000000000..25bb50e79
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Action Factory for {@link MigrateDatabaseAction}. */
+public class MigrateDatabaseActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "migrate_database";
+
+    private static final String SOURCE_TYPE = "source_type";
+    private static final String OPTIONS = "options";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        String warehouse = params.get(WAREHOUSE);
+        String connector = params.get(SOURCE_TYPE);
+        String sourceHiveDatabase = params.get(DATABASE);
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
CATALOG_CONF);
+        String tableConf = params.get(OPTIONS);
+
+        MigrateDatabaseAction migrateDatabaseAction =
+                new MigrateDatabaseAction(
+                        connector, warehouse, sourceHiveDatabase, 
catalogConfig, tableConf);
+        return Optional.of(migrateDatabaseAction);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \"migrate_database\" migrate all tables in database 
from hive to paimon.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  migrate_database --warehouse <warehouse_path> --source_type 
hive "
+                        + "--database <database_name> "
+                        + "[--catalog_conf <key>=<value] "
+                        + "[--options <key>=<value>,<key>=<value>,...]");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
new file mode 100644
index 000000000..4f2a849c9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/** Migrate procedure to migrate all hive tables in database to paimon table. 
*/
+public class MigrateDatabaseProcedure extends ProcedureBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MigrateDatabaseProcedure.class);
+    private static final String PAIMON_SUFFIX = "_paimon_";
+
+    @Override
+    public String identifier() {
+        return "migrate_database";
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String connector, String 
sourceDatabasePath)
+            throws Exception {
+        return call(procedureContext, connector, sourceDatabasePath, "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String connector,
+            String sourceDatabasePath,
+            String properties)
+            throws Exception {
+        if (!(catalog instanceof HiveCatalog)) {
+            throw new IllegalArgumentException("Only support Hive Catalog");
+        }
+        HiveCatalog hiveCatalog = (HiveCatalog) this.catalog;
+        IMetaStoreClient client = hiveCatalog.getHmsClient();
+        List<String> sourceTables = client.getAllTables(sourceDatabasePath);
+        for (String sourceTable : sourceTables) {
+            String sourceTablePath = sourceDatabasePath + "." + sourceTable;
+            String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+
+            Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+            Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
+
+            TableMigrationUtils.getImporter(
+                            connector,
+                            (HiveCatalog) this.catalog,
+                            sourceTableId.getDatabaseName(),
+                            sourceTableId.getObjectName(),
+                            targetTableId.getDatabaseName(),
+                            targetTableId.getObjectName(),
+                            
ParameterUtils.parseCommaSeparatedKeyValues(properties))
+                    .executeMigrate();
+
+            LOG.info("rename " + targetTableId + " to " + sourceTableId);
+            this.catalog.renameTable(targetTableId, sourceTableId, false);
+        }
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index e4b74f5cd..04f2b7933 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -24,6 +24,7 @@ org.apache.paimon.flink.action.CreateTagActionFactory
 org.apache.paimon.flink.action.DeleteTagActionFactory
 org.apache.paimon.flink.action.ResetConsumerActionFactory
 org.apache.paimon.flink.action.MigrateTableActionFactory
+org.apache.paimon.flink.action.MigrateDatabaseActionFactory
 org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
 org.apache.paimon.flink.action.QueryServiceActionFactory
 
@@ -39,6 +40,7 @@ org.apache.paimon.flink.procedure.MergeIntoProcedure
 org.apache.paimon.flink.procedure.ResetConsumerProcedure
 org.apache.paimon.flink.procedure.RollbackToProcedure
 org.apache.paimon.flink.procedure.MigrateTableProcedure
+org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
 org.apache.paimon.flink.procedure.MigrateFileProcedure
 org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
 org.apache.paimon.flink.procedure.QueryServiceProcedure
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
similarity index 56%
copy from 
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
copy to 
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
index daea65cf2..70345e4f8 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
@@ -19,8 +19,8 @@
 package org.apache.paimon.hive.procedure;
 
 import org.apache.paimon.flink.action.ActionITCaseBase;
-import org.apache.paimon.flink.action.MigrateTableAction;
-import org.apache.paimon.flink.procedure.MigrateFileProcedure;
+import org.apache.paimon.flink.action.MigrateDatabaseAction;
+import org.apache.paimon.flink.procedure.MigrateDatabaseProcedure;
 import org.apache.paimon.hive.TestHiveMetastore;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -42,14 +42,15 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
-/** Tests for {@link MigrateFileProcedure}. */
-public class MigrateTableProcedureITCase extends ActionITCaseBase {
+import static 
org.apache.paimon.hive.procedure.MigrateTableProcedureITCase.data;
+
+/** Tests for {@link MigrateDatabaseProcedure}. */
+public class MigrateDatabaseProcedureITCase extends ActionITCaseBase {
 
     private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
 
-    private static final int PORT = 9084;
+    private static final int PORT = 9086;
 
     @BeforeEach
     public void beforeEach() {
@@ -76,7 +77,7 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
     }
 
     @Test
-    public void testParquetNonPartitionTable() throws Exception {
+    public void testParquet() throws Exception {
         testUpgradeNonPartitionTable("parquet");
         resetMetastore();
         testUpgradePartitionTable("parquet");
@@ -96,16 +97,33 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql("CREATE DATABASE my_database");
+        tEnv.executeSql("USE my_database");
+
+        // write data into my_database.hivetable1
+        tEnv.executeSql(
+                "CREATE TABLE hivetable1 (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                        + format);
+        tEnv.executeSql("INSERT INTO hivetable1 VALUES" + data(100)).await();
+        tEnv.executeSql("SHOW CREATE TABLE hivetable1");
+
+        // write data into my_database.hivetable2
         tEnv.executeSql(
-                "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                "CREATE TABLE hivetable2 (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
                         + format);
-        tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
-        tEnv.executeSql("SHOW CREATE TABLE hivetable");
+        tEnv.executeSql("INSERT INTO hivetable2 VALUES" + data(100)).await();
+
+        tEnv.executeSql("SHOW CREATE TABLE hivetable2");
 
         tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
         tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
         tEnv.useCatalog("PAIMON_GE");
-        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+        List<Row> r1 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable1").collect());
+        List<Row> r3 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable2").collect());
 
         tEnv.executeSql(
                 "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 
'hive', 'uri' = 'thrift://localhost:"
@@ -115,13 +133,19 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
                         + "')");
         tEnv.useCatalog("PAIMON");
         tEnv.executeSql(
-                        "CALL sys.migrate_table('hive', 'default.hivetable', 
'file.format="
+                        "CALL sys.migrate_database('hive', 'my_database', 
'file.format="
                                 + format
                                 + "')")
                 .await();
-        List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+        List<Row> r2 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable1").collect());
+        List<Row> r4 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable2").collect());
 
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+        Assertions.assertThatList(r3).containsExactlyInAnyOrderElementsOf(r4);
     }
 
     public void testUpgradeNonPartitionTable(String format) throws Exception {
@@ -132,14 +156,31 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
-        tEnv.executeSql("CREATE TABLE hivetable (id string, id2 int, id3 int) 
STORED AS " + format);
-        tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
-        tEnv.executeSql("SHOW CREATE TABLE hivetable");
+        tEnv.executeSql("CREATE DATABASE my_database");
+        tEnv.executeSql("USE my_database");
+
+        // write data into my_database.hivetable1
+        tEnv.executeSql(
+                "CREATE TABLE hivetable1 (id string, id2 int, id3 int) STORED 
AS " + format);
+        tEnv.executeSql("INSERT INTO hivetable1 VALUES" + data(100)).await();
+        tEnv.executeSql("SHOW CREATE TABLE hivetable1");
+
+        // write data into my_database.hivetable2
+        tEnv.executeSql(
+                "CREATE TABLE hivetable2 (id string, id2 int, id3 int) STORED 
AS " + format);
+        tEnv.executeSql("INSERT INTO hivetable2 VALUES" + data(100)).await();
+
+        tEnv.executeSql("SHOW CREATE TABLE hivetable2");
 
         tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
         tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
         tEnv.useCatalog("PAIMON_GE");
-        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+        List<Row> r1 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable1").collect());
+        List<Row> r3 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable2").collect());
 
         tEnv.executeSql(
                 "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 
'hive', 'uri' = 'thrift://localhost:"
@@ -149,18 +190,24 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
                         + "')");
         tEnv.useCatalog("PAIMON");
         tEnv.executeSql(
-                        "CALL sys.migrate_table('hive', 'default.hivetable', 
'file.format="
+                        "CALL sys.migrate_database('hive', 'my_database', 
'file.format="
                                 + format
                                 + "')")
                 .await();
-        List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+        List<Row> r2 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable1").collect());
+        List<Row> r4 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable2").collect());
 
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+        Assertions.assertThatList(r3).containsExactlyInAnyOrderElementsOf(r4);
     }
 
     @ParameterizedTest
     @ValueSource(strings = {"orc", "parquet", "avro"})
-    public void testMigrateAction(String format) throws Exception {
+    public void testMigrateDatabaseAction(String format) throws Exception {
         StreamExecutionEnvironment env = buildDefaultEnv(false);
 
         TableEnvironment tEnv =
@@ -168,27 +215,45 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql("CREATE DATABASE my_database");
+        tEnv.executeSql("USE my_database");
+
+        // write data into my_database.hivetable1
+        tEnv.executeSql(
+                "CREATE TABLE hivetable1 (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                        + format);
+        tEnv.executeSql("INSERT INTO hivetable1 VALUES" + data(100)).await();
+        tEnv.executeSql("SHOW CREATE TABLE hivetable1");
+
+        // write data into my_database.hivetable2
         tEnv.executeSql(
-                "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                "CREATE TABLE hivetable2 (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
                         + format);
-        tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
-        tEnv.executeSql("SHOW CREATE TABLE hivetable");
+        tEnv.executeSql("INSERT INTO hivetable2 VALUES" + data(100)).await();
+
+        tEnv.executeSql("SHOW CREATE TABLE hivetable2");
 
         tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
         tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
         tEnv.useCatalog("PAIMON_GE");
-        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+        List<Row> r1 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable1").collect());
+        List<Row> r3 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable2").collect());
+
         Map<String, String> catalogConf = new HashMap<>();
         catalogConf.put("metastore", "hive");
         catalogConf.put("uri", "thrift://localhost:" + PORT);
-        MigrateTableAction migrateTableAction =
-                new MigrateTableAction(
+        MigrateDatabaseAction migrateDatabaseAction =
+                new MigrateDatabaseAction(
                         "hive",
                         
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
-                        "default.hivetable",
+                        "my_database",
                         catalogConf,
                         "");
-        migrateTableAction.run();
+        migrateDatabaseAction.run();
 
         tEnv.executeSql(
                 "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 
'hive', 'uri' = 'thrift://localhost:"
@@ -197,27 +262,14 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
                         + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
                         + "')");
         tEnv.useCatalog("PAIMON");
-        List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+        List<Row> r2 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable1").collect());
+        List<Row> r4 =
+                ImmutableList.copyOf(
+                        tEnv.executeSql("SELECT * FROM 
my_database.hivetable2").collect());
 
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
-    }
-
-    private String data(int i) {
-        Random random = new Random();
-        StringBuilder stringBuilder = new StringBuilder();
-        for (int m = 0; m < i; m++) {
-            stringBuilder.append("(");
-            stringBuilder.append("\"");
-            stringBuilder.append('a' + m);
-            stringBuilder.append("\",");
-            stringBuilder.append(random.nextInt(10));
-            stringBuilder.append(",");
-            stringBuilder.append(random.nextInt(10));
-            stringBuilder.append(")");
-            if (m != i - 1) {
-                stringBuilder.append(",");
-            }
-        }
-        return stringBuilder.toString();
+        Assertions.assertThatList(r3).containsExactlyInAnyOrderElementsOf(r4);
     }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index daea65cf2..9c727ba1e 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -76,7 +76,7 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
     }
 
     @Test
-    public void testParquetNonPartitionTable() throws Exception {
+    public void testParquet() throws Exception {
         testUpgradeNonPartitionTable("parquet");
         resetMetastore();
         testUpgradePartitionTable("parquet");
@@ -202,7 +202,7 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
     }
 
-    private String data(int i) {
+    protected static String data(int i) {
         Random random = new Random();
         StringBuilder stringBuilder = new StringBuilder();
         for (int m = 0; m < i; m++) {

Reply via email to