This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2657626f93 [Feature][Paimon] Customize the hadoop user (#8888)
2657626f93 is described below
commit 2657626f938ee1f15ed2b8f7512aeb325a91760a
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Mar 6 10:55:37 2025 +0800
[Feature][Paimon] Customize the hadoop user (#8888)
---
docs/en/connector-v2/sink/Paimon.md | 43 ++++++++++++++++++++++
docs/en/connector-v2/source/Paimon.md | 1 +
docs/zh/connector-v2/sink/Paimon.md | 43 ++++++++++++++++++++++
docs/zh/connector-v2/source/Paimon.md | 1 +
.../paimon/catalog/PaimonCatalogLoader.java | 7 ++++
.../fake_sink_paimon_truncate_with_hdfs_case1.conf | 1 +
.../fake_sink_paimon_truncate_with_hdfs_case2.conf | 1 +
7 files changed, 97 insertions(+)
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index 68aa63ad03..7e6d7cd8dd 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -246,6 +246,49 @@ sink {
}
```
+### Single table(Specify hadoop HA config with hadoop_user_name)
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
+
+transform {
+}
+
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="seatunnel"
+ table="role"
+ paimon.hadoop.conf = {
+ hadoop_user_name = "hdfs"
+ fs.defaultFS = "hdfs://nameservice1"
+ dfs.nameservices = "nameservice1"
+ dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+ dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+ dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+ dfs.client.failover.proxy.provider.nameservice1 =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ dfs.client.use.datanode.hostname = "true"
+ security.kerberos.login.principal = "your-kerberos-principal"
+ security.kerberos.login.keytab = "your-kerberos-keytab-path"
+ }
+ }
+}
+```
+
### Single table(Hive catalog)
```hocon
diff --git a/docs/en/connector-v2/source/Paimon.md
b/docs/en/connector-v2/source/Paimon.md
index fd92d0c03e..e10ae2678e 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -152,6 +152,7 @@ source {
table="st_test"
query = "select * from st_test where pk_id is not null and pk_id < 3"
paimon.hadoop.conf = {
+ hadoop_user_name = "hdfs"
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index 157c1fa5e8..58ccc85985 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -244,6 +244,49 @@ sink {
}
```
+### 单表(指定hadoop HA配置和指定hadoop用户名)
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Mysql-CDC {
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
+ username = "root"
+ password = "******"
+ table-names = ["seatunnel.role"]
+ }
+}
+
+transform {
+}
+
+sink {
+ Paimon {
+ catalog_name="seatunnel_test"
+ warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
+ database="seatunnel"
+ table="role"
+ paimon.hadoop.conf = {
+ hadoop_user_name = "hdfs"
+ fs.defaultFS = "hdfs://nameservice1"
+ dfs.nameservices = "nameservice1"
+ dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+ dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+ dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+ dfs.client.failover.proxy.provider.nameservice1 =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ dfs.client.use.datanode.hostname = "true"
+ security.kerberos.login.principal = "your-kerberos-principal"
+ security.kerberos.login.keytab = "your-kerberos-keytab-path"
+ }
+ }
+}
+```
+
### 单表(使用Hive catalog)
```hocon
diff --git a/docs/zh/connector-v2/source/Paimon.md
b/docs/zh/connector-v2/source/Paimon.md
index 27d546a138..60b9e81660 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -154,6 +154,7 @@ source {
table="st_test"
query = "select * from st_test where pk_id is not null and pk_id < 3"
paimon.hadoop.conf = {
+ hadoop_user_name = "hdfs"
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
index ae1f6d675a..be148074aa 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
@@ -25,6 +25,7 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityC
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -50,6 +51,8 @@ public class PaimonCatalogLoader implements Serializable {
private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
+ private static final String HADOOP_USER_NAME = "hadoop_user_name";
+
private String warehouse;
private PaimonCatalogEnum catalogType;
private String catalogUri;
@@ -72,6 +75,10 @@ public class PaimonCatalogLoader implements Serializable {
if (warehouse.startsWith(HDFS_PREFIX)) {
checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME);
paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL);
+ String username = paimonHadoopConfiguration.get(HADOOP_USER_NAME);
+ if (StringUtils.isNotBlank(username)) {
+
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(username));
+ }
} else if (warehouse.startsWith(S3A_PREFIX)) {
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
index 92f6f5c6de..568a54f804 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
@@ -68,6 +68,7 @@ sink {
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
+ hadoop_user_name = "hdfs"
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
index 1a5eac7322..7f9c453d35 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
@@ -53,6 +53,7 @@ sink {
table = "st_test"
data_save_mode=DROP_DATA
paimon.hadoop.conf = {
+ hadoop_user_name = "hdfs"
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
dfs.ha.namenodes.nameservice1 = "nn1,nn2"