This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 893dbcf53 [flink] Support flink migrate hive database (#2827)
893dbcf53 is described below
commit 893dbcf5365b8fbdb4778a9a30e191b14287d9c0
Author: TaoZex <[email protected]>
AuthorDate: Thu Feb 22 10:47:28 2024 +0800
[flink] Support flink migrate hive database (#2827)
---
docs/content/migration/migration-from-hive.md | 49 ++++++-
paimon-flink/paimon-flink-common/pom.xml | 37 ++++++
.../paimon/flink/action/MigrateDatabaseAction.java | 52 ++++++++
.../flink/action/MigrateDatabaseActionFactory.java | 64 +++++++++
.../flink/procedure/MigrateDatabaseProcedure.java | 84 ++++++++++++
.../services/org.apache.paimon.factories.Factory | 2 +
...se.java => MigrateDatabaseProcedureITCase.java} | 148 ++++++++++++++-------
.../procedure/MigrateTableProcedureITCase.java | 4 +-
8 files changed, 389 insertions(+), 51 deletions(-)
diff --git a/docs/content/migration/migration-from-hive.md
b/docs/content/migration/migration-from-hive.md
index 5bb86c79d..92b9e4c64 100644
--- a/docs/content/migration/migration-from-hive.md
+++ b/docs/content/migration/migration-from-hive.md
@@ -31,11 +31,13 @@ When migrating data to a paimon table, the origin table
will be permanently disa
still need the original table. The migrated table will be [unaware-bucket
append table]({{< ref "concepts/append-table/append-scalable-table" >}}).
Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate
File Procedure to totally migrate a table from hive to paimon.
+At the same time, you can use paimon hive catalog with Migrate Database
Procedure to fully synchronize all tables in the database to paimon.
* Migrate Table Procedure: Paimon table does not exist, use the procedure
upgrade hive table to paimon table. Hive table will disappear after action done.
+* Migrate Database Procedure: Paimon table does not exist, use the procedure
upgrade all hive tables in database to paimon table. All hive tables will
disappear after action done.
* Migrate File Procedure: Paimon table already exists, use the procedure to
migrate files from hive table to paimon table. **Notice that, Hive table will
also disappear after action done.**
-These two actions now support file format of hive "orc" and "parquet" and
"avro".
+These three actions now support file format of hive "orc" and "parquet" and
"avro".
<span style="color: red; "> **We highly recommend to back up hive table data
before migrating, because migrating action is not atomic. If been interrupted
while migrating, you may lose your data.** </span>
@@ -86,6 +88,51 @@ Example:
--table default.hive_or_paimon \
```
+**Migrate Hive Database**
+
+Command: <br>
+
+***CALL <font color="green">sys.migrate_database</font>('hive',
'<hive_database>', '<paimon_tableconf>');***
+
+**Example**
+
+```sql
+CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' =
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+
+USE CATALOG PAIMON;
+
+CALL sys.migrate_database('hive', 'default', 'file.format=orc');
+```
+After invoke, all tables in "default" database will totally convert to paimon
format. Writing and reading the table by old "hive way" will fail.
+We can add our table properties while importing by
sys.migrate_database('<database>', '<tableproperties>').
+<tableproperties> here should be separated by ",". For example:
+
+```sql
+CALL sys.migrate_database('hive', 'my_db',
'file.format=orc,read.batch-size=2096,write-only=true')
+```
+
+If your flink version is below 1.17, you can use flink action to achieve this:
+```bash
+<FLINK_HOME>/bin/flink run \
+/path/to/paimon-flink-action-{{< version >}}.jar \
+migrate_databse
+--warehouse <warehouse-path> \
+--source_type hive \
+--database <database> \
+[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf>
...]] \
+[--options <paimon-table-conf [,paimon-table-conf ...]> ]
+```
+
+Example:
+```bash
+<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar migrate_table
\
+--warehouse /path/to/warehouse \
+--catalog_conf uri=thrift://localhost:9083 \
+--catalog_conf metastore=hive \
+--source_type hive \
+--database default \
+```
+
**Migrate Hive File**
Command: <br>
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index 4fd85a7de..d486aaa42 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -67,6 +67,43 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${hive.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.pentaho</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java
new file mode 100644
index 000000000..40c193f8a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.MigrateDatabaseProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Migrate from external all hive table in database to paimon table. */
+public class MigrateDatabaseAction extends ActionBase {
+ private final String connector;
+ private final String hiveDatabaseName;
+ private final String tableProperties;
+
+ public MigrateDatabaseAction(
+ String connector,
+ String warehouse,
+ String hiveDatabaseName,
+ Map<String, String> catalogConfig,
+ String tableProperties) {
+ super(warehouse, catalogConfig);
+ this.connector = connector;
+ this.hiveDatabaseName = hiveDatabaseName;
+ this.tableProperties = tableProperties;
+ }
+
+ @Override
+ public void run() throws Exception {
+ MigrateDatabaseProcedure migrateDatabaseProcedure = new
MigrateDatabaseProcedure();
+ migrateDatabaseProcedure.withCatalog(catalog);
+ migrateDatabaseProcedure.call(
+ new DefaultProcedureContext(env), connector, hiveDatabaseName,
tableProperties);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java
new file mode 100644
index 000000000..25bb50e79
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Action Factory for {@link MigrateDatabaseAction}. */
+public class MigrateDatabaseActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "migrate_database";
+
+ private static final String SOURCE_TYPE = "source_type";
+ private static final String OPTIONS = "options";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ String warehouse = params.get(WAREHOUSE);
+ String connector = params.get(SOURCE_TYPE);
+ String sourceHiveDatabase = params.get(DATABASE);
+ Map<String, String> catalogConfig = optionalConfigMap(params,
CATALOG_CONF);
+ String tableConf = params.get(OPTIONS);
+
+ MigrateDatabaseAction migrateDatabaseAction =
+ new MigrateDatabaseAction(
+ connector, warehouse, sourceHiveDatabase,
catalogConfig, tableConf);
+ return Optional.of(migrateDatabaseAction);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"migrate_database\" migrate all tables in database
from hive to paimon.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " migrate_database --warehouse <warehouse_path> --source_type
hive "
+ + "--database <database_name> "
+ + "[--catalog_conf <key>=<value] "
+ + "[--options <key>=<value>,<key>=<value>,...]");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
new file mode 100644
index 000000000..4f2a849c9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/** Migrate procedure to migrate all hive tables in database to paimon table.
*/
+public class MigrateDatabaseProcedure extends ProcedureBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MigrateDatabaseProcedure.class);
+ private static final String PAIMON_SUFFIX = "_paimon_";
+
+ @Override
+ public String identifier() {
+ return "migrate_database";
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String connector, String
sourceDatabasePath)
+ throws Exception {
+ return call(procedureContext, connector, sourceDatabasePath, "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceDatabasePath,
+ String properties)
+ throws Exception {
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive Catalog");
+ }
+ HiveCatalog hiveCatalog = (HiveCatalog) this.catalog;
+ IMetaStoreClient client = hiveCatalog.getHmsClient();
+ List<String> sourceTables = client.getAllTables(sourceDatabasePath);
+ for (String sourceTable : sourceTables) {
+ String sourceTablePath = sourceDatabasePath + "." + sourceTable;
+ String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+
+ Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+ Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
+
+ TableMigrationUtils.getImporter(
+ connector,
+ (HiveCatalog) this.catalog,
+ sourceTableId.getDatabaseName(),
+ sourceTableId.getObjectName(),
+ targetTableId.getDatabaseName(),
+ targetTableId.getObjectName(),
+
ParameterUtils.parseCommaSeparatedKeyValues(properties))
+ .executeMigrate();
+
+ LOG.info("rename " + targetTableId + " to " + sourceTableId);
+ this.catalog.renameTable(targetTableId, sourceTableId, false);
+ }
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index e4b74f5cd..04f2b7933 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -24,6 +24,7 @@ org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.MigrateTableActionFactory
+org.apache.paimon.flink.action.MigrateDatabaseActionFactory
org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
org.apache.paimon.flink.action.QueryServiceActionFactory
@@ -39,6 +40,7 @@ org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
+org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
similarity index 56%
copy from
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
copy to
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
index daea65cf2..70345e4f8 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
@@ -19,8 +19,8 @@
package org.apache.paimon.hive.procedure;
import org.apache.paimon.flink.action.ActionITCaseBase;
-import org.apache.paimon.flink.action.MigrateTableAction;
-import org.apache.paimon.flink.procedure.MigrateFileProcedure;
+import org.apache.paimon.flink.action.MigrateDatabaseAction;
+import org.apache.paimon.flink.procedure.MigrateDatabaseProcedure;
import org.apache.paimon.hive.TestHiveMetastore;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -42,14 +42,15 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
-/** Tests for {@link MigrateFileProcedure}. */
-public class MigrateTableProcedureITCase extends ActionITCaseBase {
+import static
org.apache.paimon.hive.procedure.MigrateTableProcedureITCase.data;
+
+/** Tests for {@link MigrateDatabaseProcedure}. */
+public class MigrateDatabaseProcedureITCase extends ActionITCaseBase {
private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();
- private static final int PORT = 9084;
+ private static final int PORT = 9086;
@BeforeEach
public void beforeEach() {
@@ -76,7 +77,7 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
}
@Test
- public void testParquetNonPartitionTable() throws Exception {
+ public void testParquet() throws Exception {
testUpgradeNonPartitionTable("parquet");
resetMetastore();
testUpgradePartitionTable("parquet");
@@ -96,16 +97,33 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql("CREATE DATABASE my_database");
+ tEnv.executeSql("USE my_database");
+
+ // write data into my_database.hivetable1
+ tEnv.executeSql(
+ "CREATE TABLE hivetable1 (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ + format);
+ tEnv.executeSql("INSERT INTO hivetable1 VALUES" + data(100)).await();
+ tEnv.executeSql("SHOW CREATE TABLE hivetable1");
+
+ // write data into my_database.hivetable2
tEnv.executeSql(
- "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ "CREATE TABLE hivetable2 (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ format);
- tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
- tEnv.executeSql("SHOW CREATE TABLE hivetable");
+ tEnv.executeSql("INSERT INTO hivetable2 VALUES" + data(100)).await();
+
+ tEnv.executeSql("SHOW CREATE TABLE hivetable2");
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");
- List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+ List<Row> r1 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable1").collect());
+ List<Row> r3 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable2").collect());
tEnv.executeSql(
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' =
'hive', 'uri' = 'thrift://localhost:"
@@ -115,13 +133,19 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
+ "')");
tEnv.useCatalog("PAIMON");
tEnv.executeSql(
- "CALL sys.migrate_table('hive', 'default.hivetable',
'file.format="
+ "CALL sys.migrate_database('hive', 'my_database',
'file.format="
+ format
+ "')")
.await();
- List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+ List<Row> r2 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable1").collect());
+ List<Row> r4 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable2").collect());
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ Assertions.assertThatList(r3).containsExactlyInAnyOrderElementsOf(r4);
}
public void testUpgradeNonPartitionTable(String format) throws Exception {
@@ -132,14 +156,31 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
- tEnv.executeSql("CREATE TABLE hivetable (id string, id2 int, id3 int)
STORED AS " + format);
- tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
- tEnv.executeSql("SHOW CREATE TABLE hivetable");
+ tEnv.executeSql("CREATE DATABASE my_database");
+ tEnv.executeSql("USE my_database");
+
+ // write data into my_database.hivetable1
+ tEnv.executeSql(
+ "CREATE TABLE hivetable1 (id string, id2 int, id3 int) STORED
AS " + format);
+ tEnv.executeSql("INSERT INTO hivetable1 VALUES" + data(100)).await();
+ tEnv.executeSql("SHOW CREATE TABLE hivetable1");
+
+ // write data into my_database.hivetable2
+ tEnv.executeSql(
+ "CREATE TABLE hivetable2 (id string, id2 int, id3 int) STORED
AS " + format);
+ tEnv.executeSql("INSERT INTO hivetable2 VALUES" + data(100)).await();
+
+ tEnv.executeSql("SHOW CREATE TABLE hivetable2");
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");
- List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+ List<Row> r1 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable1").collect());
+ List<Row> r3 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable2").collect());
tEnv.executeSql(
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' =
'hive', 'uri' = 'thrift://localhost:"
@@ -149,18 +190,24 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
+ "')");
tEnv.useCatalog("PAIMON");
tEnv.executeSql(
- "CALL sys.migrate_table('hive', 'default.hivetable',
'file.format="
+ "CALL sys.migrate_database('hive', 'my_database',
'file.format="
+ format
+ "')")
.await();
- List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+ List<Row> r2 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable1").collect());
+ List<Row> r4 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable2").collect());
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ Assertions.assertThatList(r3).containsExactlyInAnyOrderElementsOf(r4);
}
@ParameterizedTest
@ValueSource(strings = {"orc", "parquet", "avro"})
- public void testMigrateAction(String format) throws Exception {
+ public void testMigrateDatabaseAction(String format) throws Exception {
StreamExecutionEnvironment env = buildDefaultEnv(false);
TableEnvironment tEnv =
@@ -168,27 +215,45 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql("CREATE DATABASE my_database");
+ tEnv.executeSql("USE my_database");
+
+ // write data into my_database.hivetable1
+ tEnv.executeSql(
+ "CREATE TABLE hivetable1 (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ + format);
+ tEnv.executeSql("INSERT INTO hivetable1 VALUES" + data(100)).await();
+ tEnv.executeSql("SHOW CREATE TABLE hivetable1");
+
+ // write data into my_database.hivetable2
tEnv.executeSql(
- "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ "CREATE TABLE hivetable2 (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ format);
- tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
- tEnv.executeSql("SHOW CREATE TABLE hivetable");
+ tEnv.executeSql("INSERT INTO hivetable2 VALUES" + data(100)).await();
+
+ tEnv.executeSql("SHOW CREATE TABLE hivetable2");
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");
- List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+ List<Row> r1 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable1").collect());
+ List<Row> r3 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable2").collect());
+
Map<String, String> catalogConf = new HashMap<>();
catalogConf.put("metastore", "hive");
catalogConf.put("uri", "thrift://localhost:" + PORT);
- MigrateTableAction migrateTableAction =
- new MigrateTableAction(
+ MigrateDatabaseAction migrateDatabaseAction =
+ new MigrateDatabaseAction(
"hive",
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
- "default.hivetable",
+ "my_database",
catalogConf,
"");
- migrateTableAction.run();
+ migrateDatabaseAction.run();
tEnv.executeSql(
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' =
'hive', 'uri' = 'thrift://localhost:"
@@ -197,27 +262,14 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
+
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ "')");
tEnv.useCatalog("PAIMON");
- List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+ List<Row> r2 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable1").collect());
+ List<Row> r4 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
my_database.hivetable2").collect());
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
- }
-
- private String data(int i) {
- Random random = new Random();
- StringBuilder stringBuilder = new StringBuilder();
- for (int m = 0; m < i; m++) {
- stringBuilder.append("(");
- stringBuilder.append("\"");
- stringBuilder.append('a' + m);
- stringBuilder.append("\",");
- stringBuilder.append(random.nextInt(10));
- stringBuilder.append(",");
- stringBuilder.append(random.nextInt(10));
- stringBuilder.append(")");
- if (m != i - 1) {
- stringBuilder.append(",");
- }
- }
- return stringBuilder.toString();
+ Assertions.assertThatList(r3).containsExactlyInAnyOrderElementsOf(r4);
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index daea65cf2..9c727ba1e 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -76,7 +76,7 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
}
@Test
- public void testParquetNonPartitionTable() throws Exception {
+ public void testParquet() throws Exception {
testUpgradeNonPartitionTable("parquet");
resetMetastore();
testUpgradePartitionTable("parquet");
@@ -202,7 +202,7 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
- private String data(int i) {
+ protected static String data(int i) {
Random random = new Random();
StringBuilder stringBuilder = new StringBuilder();
for (int m = 0; m < i; m++) {