This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new af2031f823 [hive][clone] check whether source catalog is HiveCatalog
(#5496)
af2031f823 is described below
commit af2031f82376b7a728db324e192a229c391c4637
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Apr 21 14:17:56 2025 +0800
[hive][clone] check whether source catalog is HiveCatalog (#5496)
---
.../org/apache/paimon/flink/action/CloneHiveAction.java | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
index 25b1c5809f..b418f7a744 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.catalog.CachingCatalog;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.clone.hive.CloneFileInfo;
import org.apache.paimon.flink.clone.hive.CloneHiveUtils;
@@ -26,7 +28,7 @@ import
org.apache.paimon.flink.clone.hive.CopyHiveFilesFunction;
import org.apache.paimon.flink.clone.hive.DataFileInfo;
import org.apache.paimon.flink.clone.hive.ListHiveFilesFunction;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
-import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.hive.HiveCatalog;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -61,10 +63,15 @@ public class CloneHiveAction extends ActionBase {
@Nullable Integer parallelism,
@Nullable String whereSql) {
super(sourceCatalogConfig);
- String metastore =
sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
- if (!"hive".equals(metastore)) {
+
+ Catalog sourceCatalog = catalog;
+ if (sourceCatalog instanceof CachingCatalog) {
+ sourceCatalog = ((CachingCatalog) sourceCatalog).wrapped();
+ }
+ if (!(sourceCatalog instanceof HiveCatalog)) {
throw new UnsupportedOperationException(
- "Only support clone hive table. Maybe you forget to set
--catalog_conf metastore=hive ?");
+ "Only support clone hive tables using HiveCatalog, but
current source catalog is "
+ + sourceCatalog.getClass().getName());
}
this.sourceDatabase = sourceDatabase;