This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3d3f7ba370 [Feature][E2E][Kerberos] Support for Kerberos in e2e
(#8108)
3d3f7ba370 is described below
commit 3d3f7ba370090a75ba562d6fce11727c6715b599
Author: Jast <[email protected]>
AuthorDate: Mon Nov 25 14:35:12 2024 +0800
[Feature][E2E][Kerberos] Support for Kerberos in e2e (#8108)
---
docs/en/connector-v2/sink/Hive.md | 23 ++
docs/en/connector-v2/source/Hive.md | 24 ++
.../e2e/connector/hive/HiveContainer.java | 51 +++-
.../seatunnel/e2e/connector/hive/HiveIT.java | 1 +
.../e2e/connector/hive/HiveKerberosIT.java | 339 +++++++++++++++++++++
.../fake_to_hive_on_hdfs_with_kerberos.conf | 62 ++++
.../hive_on_hdfs_to_assert_with_kerberos.conf | 77 +++++
.../src/test/resources/kerberos/core-site.xml | 29 ++
.../src/test/resources/kerberos/hive-site.xml | 84 +++++
.../src/test/resources/kerberos/krb5.conf | 33 ++
.../src/test/resources/kerberos/krb5_local.conf | 33 ++
.../e2e/common/container/TestContainer.java | 2 +
.../flink/AbstractTestFlinkContainer.java | 6 +
.../ConnectorPackageServiceContainer.java | 5 +
.../container/seatunnel/SeaTunnelContainer.java | 20 ++
.../spark/AbstractTestSparkContainer.java | 6 +
16 files changed, 789 insertions(+), 6 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index 147fd766a9..df5b493884 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -182,6 +182,29 @@ sink {
}
```
+### example2: Kerberos
+
+```bash
+sink {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+ metastore_uri = "thrift://metastore:9083"
+ hive_site_path = "/tmp/hive-site.xml"
+ kerberos_principal = "hive/[email protected]"
+ kerberos_keytab_path = "/tmp/hive.keytab"
+ krb5_path = "/tmp/krb5.conf"
+ }
+}
+```
+
+Description:
+
+- `hive_site_path`: The path to the `hive-site.xml` file.
+- `kerberos_principal`: The principal for Kerberos authentication.
+- `kerberos_keytab_path`: The keytab file path for Kerberos authentication.
+- `krb5_path`: The path to the `krb5.conf` file used for Kerberos
authentication.
+
+
## Hive on s3
### Step 1
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
index 5669906c3b..6667ccc8ee 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -138,6 +138,30 @@ Source plugin common parameters, please refer to [Source
Common Options](../sour
```
+### Example3 : Kerberos
+
+```bash
+source {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ result_table_name = hive_source
+ hive_site_path = "/tmp/hive-site.xml"
+ kerberos_principal = "hive/[email protected]"
+ kerberos_keytab_path = "/tmp/hive.keytab"
+ krb5_path = "/tmp/krb5.conf"
+ }
+}
+```
+
+Description:
+
+- `hive_site_path`: The path to the `hive-site.xml` file.
+- `kerberos_principal`: The principal for Kerberos authentication.
+- `kerberos_keytab_path`: The keytab file path for Kerberos authentication.
+- `krb5_path`: The path to the `krb5.conf` file used for Kerberos
authentication.
+
## Hive on s3
### Step 1
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
index 486ad0b8b6..cadf95c110 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
@@ -17,9 +17,13 @@
package org.apache.seatunnel.e2e.connector.hive;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -28,6 +32,7 @@ import
org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
@@ -70,24 +75,58 @@ public class HiveContainer extends
GenericContainer<HiveContainer> {
return String.format("thrift://%s:%s", getHost(),
getMappedPort(HMS_PORT));
}
- public String getHiveJdbcUri() {
- return String.format(
- "jdbc:hive2://%s:%s/default", getHost(),
getMappedPort(HIVE_SERVER_PORT));
+ public String getHiveJdbcUri(boolean enableKerberos) {
+ if (enableKerberos) {
+ return String.format(
+
"jdbc:hive2://%s:%s/default;principal=hive/[email protected]",
+ getHost(), getMappedPort(HIVE_SERVER_PORT));
+ } else {
+ return String.format(
+ "jdbc:hive2://%s:%s/default", getHost(),
getMappedPort(HIVE_SERVER_PORT));
+ }
}
public HiveMetaStoreClient createMetaStoreClient() throws MetaException {
+ return this.createMetaStoreClient(false);
+ }
+
+ public HiveMetaStoreClient createMetaStoreClient(boolean enableKerberos)
throws MetaException {
HiveConf conf = new HiveConf();
conf.set("hive.metastore.uris", getMetastoreUri());
-
+ if (enableKerberos) {
+ conf.addResource("kerberos/hive-site.xml");
+ }
return new HiveMetaStoreClient(conf);
}
public Connection getConnection()
throws ClassNotFoundException, InstantiationException,
IllegalAccessException,
SQLException {
- Driver driver = loadHiveJdbcDriver();
+ return getConnection(false);
+ }
- return driver.connect(getHiveJdbcUri(), getJdbcConnectionConfig());
+ public Connection getConnection(boolean enableKerberos)
+ throws ClassNotFoundException, InstantiationException,
IllegalAccessException,
+ SQLException {
+ Driver driver = loadHiveJdbcDriver();
+ if (!enableKerberos) {
+ return driver.connect(getHiveJdbcUri(false),
getJdbcConnectionConfig());
+ }
+ Configuration authConf = new Configuration();
+ authConf.set("hadoop.security.authentication", "kerberos");
+ Configuration configuration = new Configuration();
+ System.setProperty(
+ "java.security.krb5.conf",
+
ContainerUtil.getResourcesFile("/kerberos/krb5_local.conf").getPath());
+ configuration.set("hadoop.security.authentication", "KERBEROS");
+ try {
+ UserGroupInformation.setConfiguration(configuration);
+ UserGroupInformation.loginUserFromKeytab(
+ "hive/[email protected]",
"/tmp/hive.keytab");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return driver.connect(getHiveJdbcUri(true), getJdbcConnectionConfig());
}
public Driver loadHiveJdbcDriver()
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
index 5973d69758..bfa83dfb3b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
@@ -180,6 +180,7 @@ public class HiveIT extends TestSuiteBase implements
TestResource {
.await()
.atMost(360, TimeUnit.SECONDS)
.pollDelay(Duration.ofSeconds(10L))
+ .pollInterval(Duration.ofSeconds(3L))
.untilAsserted(this::initializeConnection);
prepareTable();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
new file mode 100644
index 0000000000..c2fca452fa
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hive;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK})
+@Slf4j
+public class HiveKerberosIT extends SeaTunnelContainer {
+
+ // It is necessary to set up a separate network with a fixed name,
otherwise network issues may
+ // cause Kerberos authentication failure
+ Network NETWORK =
+ Network.builder()
+ .createNetworkCmdModifier(cmd -> cmd.withName("SEATUNNEL"))
+ .enableIpv6(false)
+ .build();
+
+ private static final String CREATE_SQL =
+ "CREATE TABLE test_hive_sink_on_hdfs_with_kerberos"
+ + "("
+ + " pk_id BIGINT,"
+ + " name STRING,"
+ + " score INT"
+ + ")";
+
+ private static final String HMS_HOST = "metastore";
+ private static final String HIVE_SERVER_HOST = "hiveserver2";
+ private GenericContainer<?> kerberosContainer;
+ private static final String KERBEROS_IMAGE_NAME =
"zhangshenghang/kerberos-server:1.0";
+
+ private String hiveExeUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar";
+ }
+
+ private String libFb303Url() {
+ return
"https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar";
+ }
+
+ private String hadoopAwsUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar";
+ }
+
+ private String aliyunSdkOssUrl() {
+ return
"https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.4.1/aliyun-sdk-oss-3.4.1.jar";
+ }
+
+ private String jdomUrl() {
+ return "https://repo1.maven.org/maven2/org/jdom/jdom/1.1/jdom-1.1.jar";
+ }
+
+ private String hadoopAliyunUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.1.4/hadoop-aliyun-3.1.4.jar";
+ }
+
+ private String hadoopCosUrl() {
+ return
"https://repo1.maven.org/maven2/com/qcloud/cos/hadoop-cos/2.6.5-8.0.2/hadoop-cos-2.6.5-8.0.2.jar";
+ }
+
+ private HiveContainer hiveServerContainer;
+ private HiveContainer hmsContainer;
+ private Connection hiveConnection;
+ private String pluginHiveDir = "/tmp/seatunnel/plugins/Hive/lib";
+
+ protected void downloadHivePluginJar() throws IOException,
InterruptedException {
+ Container.ExecResult downloadHiveExeCommands =
+ server.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p "
+ + pluginHiveDir
+ + " && cd "
+ + pluginHiveDir
+ + " && wget "
+ + hiveExeUrl());
+ Assertions.assertEquals(
+ 0, downloadHiveExeCommands.getExitCode(),
downloadHiveExeCommands.getStderr());
+ Container.ExecResult downloadLibFb303Commands =
+ server.execInContainer(
+ "sh", "-c", "cd " + pluginHiveDir + " && wget " +
libFb303Url());
+ Assertions.assertEquals(
+ 0, downloadLibFb303Commands.getExitCode(),
downloadLibFb303Commands.getStderr());
+ // The jar of s3
+ Container.ExecResult downloadS3Commands =
+ server.execInContainer(
+ "sh", "-c", "cd " + pluginHiveDir + " && wget " +
hadoopAwsUrl());
+ Assertions.assertEquals(
+ 0, downloadS3Commands.getExitCode(),
downloadS3Commands.getStderr());
+ // The jar of oss
+ Container.ExecResult downloadOssCommands =
+ server.execInContainer(
+ "sh",
+ "-c",
+ "cd "
+ + pluginHiveDir
+ + " && wget "
+ + aliyunSdkOssUrl()
+ + " && wget "
+ + jdomUrl()
+ + " && wget "
+ + hadoopAliyunUrl());
+ Assertions.assertEquals(
+ 0, downloadOssCommands.getExitCode(),
downloadOssCommands.getStderr());
+ // The jar of cos
+ Container.ExecResult downloadCosCommands =
+ server.execInContainer(
+ "sh", "-c", "cd " + pluginHiveDir + " && wget " +
hadoopCosUrl());
+ Assertions.assertEquals(
+ 0, downloadCosCommands.getExitCode(),
downloadCosCommands.getStderr());
+ };
+
+ @BeforeEach
+ @Override
+ public void startUp() throws Exception {
+
+ kerberosContainer =
+ new GenericContainer<>(KERBEROS_IMAGE_NAME)
+ .withNetwork(NETWORK)
+ .withExposedPorts(88, 749)
+ .withCreateContainerCmdModifier(cmd ->
cmd.withHostName("kerberos"))
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(KERBEROS_IMAGE_NAME)));
+ kerberosContainer.setPortBindings(Arrays.asList("88/udp:88/udp",
"749:749"));
+ Startables.deepStart(Stream.of(kerberosContainer)).join();
+ log.info("Kerberos just started");
+
+ // Copy the keytab file from kerberos container to local
+ given().ignoreExceptions()
+ .await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollDelay(Duration.ofSeconds(1L))
+ .untilAsserted(
+ () ->
+ kerberosContainer.copyFileFromContainer(
+ "/tmp/hive.keytab",
"/tmp/hive.keytab"));
+
+ hmsContainer =
+ HiveContainer.hmsStandalone()
+ .withCreateContainerCmdModifier(cmd ->
cmd.withName(HMS_HOST))
+ .withNetwork(NETWORK)
+ .withFileSystemBind(
+
ContainerUtil.getResourcesFile("/kerberos/krb5.conf").getPath(),
+ "/etc/krb5.conf")
+ .withFileSystemBind("/tmp/hive.keytab",
"/tmp/hive.keytab")
+ .withFileSystemBind(
+
ContainerUtil.getResourcesFile("/kerberos/hive-site.xml").getPath(),
+ "/opt/hive/conf/hive-site.xml")
+ .withFileSystemBind(
+
ContainerUtil.getResourcesFile("/kerberos/core-site.xml").getPath(),
+ "/opt/hive/conf/core-site.xml")
+ .withNetworkAliases(HMS_HOST);
+ hmsContainer.setPortBindings(Collections.singletonList("9083:9083"));
+
+ Startables.deepStart(Stream.of(hmsContainer)).join();
+ log.info("HMS just started");
+
+ hiveServerContainer =
+ HiveContainer.hiveServer()
+ .withNetwork(NETWORK)
+ .withCreateContainerCmdModifier(cmd ->
cmd.withName(HIVE_SERVER_HOST))
+ .withNetworkAliases(HIVE_SERVER_HOST)
+ .withFileSystemBind(
+
ContainerUtil.getResourcesFile("/kerberos/krb5.conf").getPath(),
+ "/etc/krb5.conf")
+ .withFileSystemBind("/tmp/hive.keytab",
"/tmp/hive.keytab")
+ .withFileSystemBind(
+
ContainerUtil.getResourcesFile("/kerberos/hive-site.xml").getPath(),
+ "/opt/hive/conf/hive-site.xml")
+ .withFileSystemBind(
+
ContainerUtil.getResourcesFile("/kerberos/core-site.xml").getPath(),
+ "/opt/hive/conf/core-site.xml")
+ .withFileSystemBind("/tmp/data", "/opt/hive/data")
+ // If there are any issues, you can open the kerberos
debug log to view
+ // more information: -Dsun.security.krb5.debug=true
+ .withEnv("SERVICE_OPTS",
"-Dhive.metastore.uris=thrift://metastore:9083")
+ .withEnv("IS_RESUME", "true")
+ .dependsOn(hmsContainer);
+
hiveServerContainer.setPortBindings(Collections.singletonList("10000:10000"));
+
+ Startables.deepStart(Stream.of(hiveServerContainer)).join();
+
+ log.info("HiveServer2 just started");
+
+ given().ignoreExceptions()
+ .await()
+ .atMost(3600, TimeUnit.SECONDS)
+ .pollDelay(Duration.ofSeconds(10L))
+ .pollInterval(Duration.ofSeconds(3L))
+ .untilAsserted(this::initializeConnection);
+
+ prepareTable();
+
+ // Set the fixed network to SeatunnelContainer
+ super.startUp(this.NETWORK);
+ // Load the hive plugin jar
+ this.downloadHivePluginJar();
+ }
+
+ @AfterEach
+ @Override
+ public void tearDown() throws Exception {
+ if (hmsContainer != null) {
+ log.info(hmsContainer.execInContainer("cat",
"/tmp/hive/hive.log").getStdout());
+ hmsContainer.close();
+ }
+ if (hiveServerContainer != null) {
+ log.info(hiveServerContainer.execInContainer("cat",
"/tmp/hive/hive.log").getStdout());
+ hiveServerContainer.close();
+ }
+ }
+
+ private void initializeConnection()
+ throws ClassNotFoundException, InstantiationException,
IllegalAccessException,
+ SQLException {
+ this.hiveConnection = this.hiveServerContainer.getConnection(true);
+ }
+
+ private void prepareTable() throws Exception {
+ log.info(
+ String.format(
+ "Databases are %s",
+
this.hmsContainer.createMetaStoreClient(true).getAllDatabases()));
+ try (Statement statement = this.hiveConnection.createStatement()) {
+ statement.execute(CREATE_SQL);
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw exception;
+ }
+ }
+
+ private void executeJob(TestContainer container, String job1, String job2)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob(job1);
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Container.ExecResult readResult = container.executeJob(job2);
+ Assertions.assertEquals(0, readResult.getExitCode());
+ }
+
+ @Test
+ public void testFakeSinkHiveOnHDFS() throws Exception {
+ copyAbsolutePathToContainer("/tmp/hive.keytab", "/tmp/hive.keytab");
+ copyFileToContainer("/kerberos/krb5.conf", "/tmp/krb5.conf");
+ copyFileToContainer("/kerberos/hive-site.xml", "/tmp/hive-site.xml");
+
+ Container.ExecResult fakeToHiveWithKerberosResult =
+ executeJob("/fake_to_hive_on_hdfs_with_kerberos.conf");
+ Assertions.assertEquals(0, fakeToHiveWithKerberosResult.getExitCode());
+
+ Container.ExecResult hiveToAssertWithKerberosResult =
+ executeJob("/hive_on_hdfs_to_assert_with_kerberos.conf");
+ Assertions.assertEquals(0,
hiveToAssertWithKerberosResult.getExitCode());
+
+ Container.ExecResult fakeToHiveResult =
executeJob("/fake_to_hive_on_hdfs.conf");
+ Assertions.assertEquals(1, fakeToHiveResult.getExitCode());
+ Assertions.assertTrue(
+ fakeToHiveResult
+ .getStderr()
+ .contains("Get hive table information from hive
metastore service failed"));
+
+ Container.ExecResult hiveToAssertResult =
executeJob("/hive_on_hdfs_to_assert.conf");
+ Assertions.assertEquals(1, hiveToAssertResult.getExitCode());
+ Assertions.assertTrue(
+ hiveToAssertResult
+ .getStderr()
+ .contains("Get hive table information from hive
metastore service failed"));
+ }
+
+ @TestTemplate
+ @Disabled(
+ "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this
test, please set up your own environment in the test case file,
hadoop_hive_conf_path_local and ip below}")
+ public void testFakeSinkHiveOnS3(TestContainer container) throws Exception
{
+ executeJob(container, "/fake_to_hive_on_s3.conf",
"/hive_on_s3_to_assert.conf");
+ }
+
+ @TestTemplate
+ @Disabled(
+ "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this
test, please set up your own environment in the test case file,
hadoop_hive_conf_path_local and ip below}")
+ public void testFakeSinkHiveOnOSS(TestContainer container) throws
Exception {
+ executeJob(container, "/fake_to_hive_on_oss.conf",
"/hive_on_oss_to_assert.conf");
+ }
+
+ @TestTemplate
+ @Disabled(
+ "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this
test, please set up your own environment in the test case file,
hadoop_hive_conf_path_local and ip below}")
+ public void testFakeSinkHiveOnCos(TestContainer container) throws
Exception {
+ executeJob(container, "/fake_to_hive_on_cos.conf",
"/hive_on_cos_to_assert.conf");
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs_with_kerberos.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs_with_kerberos.conf
new file mode 100644
index 0000000000..d74b396a62
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs_with_kerberos.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+ metastore_uri = "thrift://metastore:9083"
+ hive_site_path = "/tmp/hive-site.xml"
+ kerberos_principal = "hive/[email protected]"
+ kerberos_keytab_path = "/tmp/hive.keytab"
+ krb5_path = "/tmp/krb5.conf"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert_with_kerberos.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert_with_kerberos.conf
new file mode 100644
index 0000000000..59c768e4fb
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert_with_kerberos.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ result_table_name = hive_source
+ hive_site_path = "/tmp/hive-site.xml"
+ kerberos_principal = "hive/[email protected]"
+ kerberos_keytab_path = "/tmp/hive.keytab"
+ krb5_path = "/tmp/krb5.conf"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/core-site.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/core-site.xml
new file mode 100644
index 0000000000..ed5df9b0f5
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/core-site.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+ <property>
+ <name>hadoop.security.authorization</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hadoop.security.authentication</name>
+ <value>kerberos</value>
+ </property>
+</configuration>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/hive-site.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/hive-site.xml
new file mode 100644
index 0000000000..2dd37b5256
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/hive-site.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <property>
+ <name>hive.server2.authentication</name>
+ <value>KERBEROS</value>
+ </property>
+ <property>
+ <name>hive.server2.authentication.kerberos.principal</name>
+ <value>hive/[email protected]</value>
+ </property>
+ <property>
+ <name>hive.server2.authentication.kerberos.keytab</name>
+ <value>/tmp/hive.keytab</value>
+ </property>
+ <property>
+ <name>hive.security.authenticator.manager</name>
+
<value>org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator</value>
+ </property>
+ <property>
+ <name>hive.metastore.sasl.enabled</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hive.metastore.kerberos.keytab.file</name>
+ <value>/tmp/hive.keytab</value>
+ </property>
+ <property>
+ <name>hive.metastore.kerberos.principal</name>
+ <value>hive/[email protected]</value>
+ </property>
+ <property>
+ <name>hive.exec.scratchdir</name>
+ <value>/opt/hive/scratch_dir</value>
+ </property>
+ <property>
+ <name>hive.user.install.directory</name>
+ <value>/opt/hive/install_dir</value>
+ </property>
+ <property>
+ <name>tez.runtime.optimize.local.fetch</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hive.exec.submit.local.task.via.child</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>mapreduce.framework.name</name>
+ <value>local</value>
+ </property>
+ <property>
+ <name>tez.local.mode</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hive.execution.engine</name>
+ <value>tez</value>
+ </property>
+ <property>
+ <name>metastore.warehouse.dir</name>
+ <value>/opt/hive/data/warehouse</value>
+ </property>
+ <property>
+ <name>metastore.metastore.event.db.notification.api.auth</name>
+ <value>false</value>
+ </property>
+</configuration>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5.conf
new file mode 100755
index 0000000000..2b09b1c3e5
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[libdefaults]
+ default_realm = EXAMPLE.COM
+ dns_lookup_realm = true
+ dns_lookup_kdc = true
+ ticket_lifetime = 24h
+ forwardable = true
+
+[realms]
+ EXAMPLE.COM = {
+ kdc = kerberos:88
+ admin_server = kerberos:749
+ }
+
+[domain_realm]
+ .example.com = EXAMPLE.COM
+ example.com = EXAMPLE.COM
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5_local.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5_local.conf
new file mode 100755
index 0000000000..bd136e9e8d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5_local.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[libdefaults]
+ default_realm = EXAMPLE.COM
+ dns_lookup_realm = true
+ dns_lookup_kdc = true
+ ticket_lifetime = 24h
+ forwardable = true
+
+[realms]
+ EXAMPLE.COM = {
+ kdc = localhost:88
+ admin_server = localhost:749
+ }
+
+[domain_realm]
+ .example.com = EXAMPLE.COM
+ example.com = EXAMPLE.COM
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index fd49c7b46e..f815ecb6b2 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -80,4 +80,6 @@ public interface TestContainer extends TestResource {
String getServerLogs();
void copyFileToContainer(String path, String targetPath);
+
+ void copyAbsolutePathToContainer(String path, String targetPath);
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 47b3de5ff5..007a5de4c7 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -34,6 +34,7 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -180,4 +181,9 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath,
jobManager);
}
+
+ @Override
+ public void copyAbsolutePathToContainer(String path, String targetPath) {
+ ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath,
jobManager);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
index ea8bcd8788..54804d1057 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
@@ -245,4 +245,9 @@ public class ConnectorPackageServiceContainer extends
AbstractTestContainer {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath,
server1);
}
+
+ @Override
+ public void copyAbsolutePathToContainer(String path, String targetPath) {
+ ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath,
server1);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index b9ff54b6c3..51947c9e9d 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -39,6 +39,7 @@ import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerLoggerFactory;
@@ -86,7 +87,21 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
server = createSeaTunnelServer();
}
+ /**
+ * Start up the seatunnel server with the given network.
+ *
+ * @param NETWORK the network to use
+ */
+ public void startUp(Network NETWORK) throws Exception {
+ server = createSeaTunnelServer(NETWORK);
+ }
+
private GenericContainer<?> createSeaTunnelServer() throws IOException,
InterruptedException {
+ return createSeaTunnelServer(NETWORK);
+ }
+
+ private GenericContainer<?> createSeaTunnelServer(Network NETWORK)
+ throws IOException, InterruptedException {
GenericContainer<?> server =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
@@ -523,4 +538,9 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath,
server);
}
+
+ @Override
+ public void copyAbsolutePathToContainer(String path, String targetPath) {
+ ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath,
server);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index b13851582c..d6c08f1231 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -31,6 +31,7 @@ import org.testcontainers.utility.DockerLoggerFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -131,4 +132,9 @@ public abstract class AbstractTestSparkContainer extends
AbstractTestContainer {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath,
master);
}
+
+ @Override
+ public void copyAbsolutePathToContainer(String path, String targetPath) {
+ ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath,
master);
+ }
}