This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2657626f93 [Feature][Paimon] Customize the hadoop user  (#8888)
2657626f93 is described below

commit 2657626f938ee1f15ed2b8f7512aeb325a91760a
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Mar 6 10:55:37 2025 +0800

    [Feature][Paimon] Customize the hadoop user  (#8888)
---
 docs/en/connector-v2/sink/Paimon.md                | 43 ++++++++++++++++++++++
 docs/en/connector-v2/source/Paimon.md              |  1 +
 docs/zh/connector-v2/sink/Paimon.md                | 43 ++++++++++++++++++++++
 docs/zh/connector-v2/source/Paimon.md              |  1 +
 .../paimon/catalog/PaimonCatalogLoader.java        |  7 ++++
 .../fake_sink_paimon_truncate_with_hdfs_case1.conf |  1 +
 .../fake_sink_paimon_truncate_with_hdfs_case2.conf |  1 +
 7 files changed, 97 insertions(+)

diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index 68aa63ad03..7e6d7cd8dd 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -246,6 +246,49 @@ sink {
 }
 ```
 
+### Single table(Specify hadoop HA config with hadoop_user_name) 
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  Mysql-CDC {
+    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+    username = "root"
+    password = "******"
+    table-names = ["seatunnel.role"]
+  }
+}
+
+transform {
+}
+
+sink {
+  Paimon {
+    catalog_name="seatunnel_test"
+    warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
+    database="seatunnel"
+    table="role"
+    paimon.hadoop.conf = {
+      hadoop_user_name = "hdfs"
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+      security.kerberos.login.principal = "your-kerberos-principal"
+      security.kerberos.login.keytab = "your-kerberos-keytab-path"
+    }
+  }
+}
+```
+
 ### Single table(Hive catalog)
 
 ```hocon
diff --git a/docs/en/connector-v2/source/Paimon.md 
b/docs/en/connector-v2/source/Paimon.md
index fd92d0c03e..e10ae2678e 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -152,6 +152,7 @@ source {
     table="st_test"
     query = "select * from st_test where pk_id is not null and pk_id < 3"
     paimon.hadoop.conf = {
+      hadoop_user_name = "hdfs"
       fs.defaultFS = "hdfs://nameservice1"
       dfs.nameservices = "nameservice1"
       dfs.ha.namenodes.nameservice1 = "nn1,nn2"
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index 157c1fa5e8..58ccc85985 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -244,6 +244,49 @@ sink {
 }
 ```
 
+### 单表(指定hadoop HA配置和指定hadoop用户名)
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  Mysql-CDC {
+    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+    username = "root"
+    password = "******"
+    table-names = ["seatunnel.role"]
+  }
+}
+
+transform {
+}
+
+sink {
+  Paimon {
+    catalog_name="seatunnel_test"
+    warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
+    database="seatunnel"
+    table="role"
+    paimon.hadoop.conf = {
+      hadoop_user_name = "hdfs"
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+      security.kerberos.login.principal = "your-kerberos-principal"
+      security.kerberos.login.keytab = "your-kerberos-keytab-path"
+    }
+  }
+}
+```
+
 ### 单表(使用Hive catalog)
 
 ```hocon
diff --git a/docs/zh/connector-v2/source/Paimon.md 
b/docs/zh/connector-v2/source/Paimon.md
index 27d546a138..60b9e81660 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -154,6 +154,7 @@ source {
     table="st_test"
     query = "select * from st_test where pk_id is not null and pk_id < 3"
     paimon.hadoop.conf = {
+      hadoop_user_name = "hdfs"
       fs.defaultFS = "hdfs://nameservice1"
       dfs.nameservices = "nameservice1"
       dfs.ha.namenodes.nameservice1 = "nn1,nn2"
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
index ae1f6d675a..be148074aa 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityC
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -50,6 +51,8 @@ public class PaimonCatalogLoader implements Serializable {
 
     private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
 
+    private static final String HADOOP_USER_NAME = "hadoop_user_name";
+
     private String warehouse;
     private PaimonCatalogEnum catalogType;
     private String catalogUri;
@@ -72,6 +75,10 @@ public class PaimonCatalogLoader implements Serializable {
         if (warehouse.startsWith(HDFS_PREFIX)) {
             checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME);
             paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL);
+            String username = paimonHadoopConfiguration.get(HADOOP_USER_NAME);
+            if (StringUtils.isNotBlank(username)) {
+                
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(username));
+            }
         } else if (warehouse.startsWith(S3A_PREFIX)) {
             
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
         }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
index 92f6f5c6de..568a54f804 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
@@ -68,6 +68,7 @@ sink {
     database = "seatunnel_namespace11"
     table = "st_test"
     paimon.hadoop.conf = {
+      hadoop_user_name = "hdfs"
       fs.defaultFS = "hdfs://nameservice1"
       dfs.nameservices = "nameservice1"
       dfs.ha.namenodes.nameservice1 = "nn1,nn2"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
index 1a5eac7322..7f9c453d35 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
@@ -53,6 +53,7 @@ sink {
     table = "st_test"
     data_save_mode=DROP_DATA
     paimon.hadoop.conf = {
+      hadoop_user_name = "hdfs"
       fs.defaultFS = "hdfs://nameservice1"
       dfs.nameservices = "nameservice1"
       dfs.ha.namenodes.nameservice1 = "nn1,nn2"

Reply via email to