This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e2a4772933 [Feature][Connector-v2] Support S3 filesystem of paimon
connector (#8036)
e2a4772933 is described below
commit e2a477293303161e064d6197fe82b77de375f5ef
Author: dailai <[email protected]>
AuthorDate: Thu Nov 14 20:27:43 2024 +0800
[Feature][Connector-v2] Support S3 filesystem of paimon connector (#8036)
---
docs/en/connector-v2/sink/Paimon.md | 54 +++++++++
docs/en/connector-v2/source/Paimon.md | 32 +++++
docs/zh/connector-v2/sink/Paimon.md | 54 ++++++++-
seatunnel-connectors-v2/connector-paimon/pom.xml | 34 ++++++
.../paimon/catalog/PaimonCatalogLoader.java | 9 +-
.../seatunnel/paimon/filesystem/S3Loader.java | 47 ++++++++
.../services/org.apache.paimon.fs.FileIOLoader | 16 +++
.../connector-paimon-e2e/pom.xml | 35 +++++-
.../e2e/connector/paimon/PaimonWithS3IT.java | 130 +++++++++++++++++++++
.../src/test/resources/fake_to_paimon_with_s3.conf | 95 +++++++++++++++
.../test/resources/paimon_with_s3_to_assert.conf | 98 ++++++++++++++++
.../container/seatunnel/SeaTunnelContainer.java | 2 +-
12 files changed, 596 insertions(+), 10 deletions(-)
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index c9e4b3a9b6..68c0755cfd 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -61,6 +61,13 @@ All `changelog-producer` modes are currently supported. The
default is `none`.
> note:
> When you use a streaming mode to read paimon table,different mode will
> produce [different
> results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
+## Filesystems
+The Paimon connector supports writing data to multiple file systems.
Currently, the supported file systems are hdfs and s3.
+If you use the s3 filesystem. You can configure the
`fs.s3a.access-key`、`fs.s3a.secret-key`、`fs.s3a.endpoint`、`fs.s3a.path.style.access`、`fs.s3a.aws.credentials.provider`
properties in the `paimon.hadoop.conf` option.
+Besides, the warehouse should start with `s3a://`.
+
+
+
## Examples
### Single table
@@ -94,6 +101,53 @@ sink {
}
```
+### Single table with s3 filesystem
+
+```hocon
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "s3a://test/"
+ database = "seatunnel_namespace11"
+ table = "st_test"
+ paimon.hadoop.conf = {
+ fs.s3a.access-key=G52pnxg67819khOZ9ezX
+ fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
+ fs.s3a.endpoint="http://minio4:9000"
+ fs.s3a.path.style.access=true
+
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ }
+ }
+}
+```
+
### Single table(Specify hadoop HA config and kerberos config)
```hocon
diff --git a/docs/en/connector-v2/source/Paimon.md
b/docs/en/connector-v2/source/Paimon.md
index e586a4fd9d..cbe3b592f8 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -82,6 +82,11 @@ Properties in hadoop conf
The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
+## Filesystems
+The Paimon connector supports writing data to multiple file systems.
Currently, the supported file systems are hdfs and s3.
+If you use the s3 filesystem. You can configure the
`fs.s3a.access-key`、`fs.s3a.secret-key`、`fs.s3a.endpoint`、`fs.s3a.path.style.access`、`fs.s3a.aws.credentials.provider`
properties in the `paimon.hadoop.conf` option.
+Besides, the warehouse should start with `s3a://`.
+
## Examples
### Simple example
@@ -109,6 +114,33 @@ source {
}
```
+### S3 example
+```hocon
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Paimon {
+ warehouse = "s3a://test/"
+ database = "seatunnel_namespace11"
+ table = "st_test"
+ paimon.hadoop.conf = {
+ fs.s3a.access-key=G52pnxg67819khOZ9ezX
+ fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
+ fs.s3a.endpoint="http://minio4:9000"
+ fs.s3a.path.style.access=true
+
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ }
+ }
+}
+
+sink {
+ Console{}
+}
+```
+
### Hadoop conf example
```hocon
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index 375c8c90ca..09f4e63fbf 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -58,7 +58,12 @@
Paimon表的changelog产生模式有[四种](https://paimon.apache.org/docs/mast
*
[`lookup`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup)
*
[`full-compaction`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#full-compaction)
> 注意:
- >
当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
+>
当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
+
+## 文件系统
+Paimon连接器支持向多文件系统写入数据。目前支持的文件系统有hdfs和s3。
+如果您使用s3文件系统。您可以配置`fs.s3a.access-key `, `fs.s3a.secret-key`, `fs.s3a.endpoint`,
`fs.s3a.path.style.access`,
`fs.s3a.aws.credentials`。在`paimon.hadoop.conf`选项中设置提供程序的属性。
+除此之外,warehouse应该以`s3a://`开头。
## 示例
@@ -93,6 +98,53 @@ sink {
}
```
+### 单表(基于S3文件系统)
+
+```hocon
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "s3a://test/"
+ database = "seatunnel_namespace11"
+ table = "st_test"
+ paimon.hadoop.conf = {
+ fs.s3a.access-key=G52pnxg67819khOZ9ezX
+ fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
+ fs.s3a.endpoint="http://minio4:9000"
+ fs.s3a.path.style.access=true
+
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ }
+ }
+}
+```
+
### 单表(指定hadoop HA配置和kerberos配置)
```hocon
diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml
b/seatunnel-connectors-v2/connector-paimon/pom.xml
index 80934e68a2..0cd3f535d0 100644
--- a/seatunnel-connectors-v2/connector-paimon/pom.xml
+++ b/seatunnel-connectors-v2/connector-paimon/pom.xml
@@ -32,6 +32,7 @@
<properties>
<paimon.version>0.7.0-incubating</paimon.version>
<hive.version>2.3.9</hive.version>
+ <connector.name>connector.paimon</connector.name>
</properties>
<dependencies>
@@ -47,6 +48,12 @@
<version>${paimon.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-s3-impl</artifactId>
+ <version>${paimon.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-guava</artifactId>
@@ -98,4 +105,31 @@
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <filters>
+ <filter>
+
<artifact>org.apache.paimon:paimon-s3-impl</artifact>
+ <excludes>
+ <exclude>org/apache/hadoop/**</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
index 774576c408..ae1f6d675a 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
@@ -44,6 +44,7 @@ public class PaimonCatalogLoader implements Serializable {
private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
private static final String HDFS_PREFIX = "hdfs://";
+ private static final String S3A_PREFIX = "s3a://";
/** ********* Hdfs constants ************* */
private static final String HDFS_IMPL =
"org.apache.hadoop.hdfs.DistributedFileSystem";
@@ -63,7 +64,7 @@ public class PaimonCatalogLoader implements Serializable {
}
public Catalog loadCatalog() {
- // When using the seatunel engine, set the current class loader to
prevent loading failures
+ // When using the seatunnel engine, set the current class loader to
prevent loading failures
Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader());
final Map<String, String> optionsMap = new HashMap<>(1);
optionsMap.put(CatalogOptions.WAREHOUSE.key(), warehouse);
@@ -71,12 +72,12 @@ public class PaimonCatalogLoader implements Serializable {
if (warehouse.startsWith(HDFS_PREFIX)) {
checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME);
paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL);
+ } else if (warehouse.startsWith(S3A_PREFIX)) {
+
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
}
if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) {
optionsMap.put(CatalogOptions.URI.key(), catalogUri);
- paimonHadoopConfiguration
- .getPropsWithPrefix(StringUtils.EMPTY)
- .forEach((k, v) -> optionsMap.put(k, v));
+
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
}
final Options options = Options.fromMap(optionsMap);
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/filesystem/S3Loader.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/filesystem/S3Loader.java
new file mode 100644
index 0000000000..915070c8ea
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/filesystem/S3Loader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.filesystem;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.s3.S3FileIO;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class S3Loader implements FileIOLoader {
+ @Override
+ public String getScheme() {
+ return "s3a";
+ }
+
+ @Override
+ public List<String[]> requiredOptions() {
+ List<String[]> options = new ArrayList<>();
+ options.add(new String[] {"fs.s3a.access-key", "fs.s3a.access.key"});
+ options.add(new String[] {"fs.s3a.secret-key", "fs.s3a.secret.key"});
+ options.add(new String[] {"fs.s3a.endpoint", "fs.s3a.endpoint"});
+ return options;
+ }
+
+ @Override
+ public FileIO load(Path path) {
+ return new S3FileIO();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
b/seatunnel-connectors-v2/connector-paimon/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
new file mode 100644
index 0000000000..0057f40425
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.seatunnel.connectors.seatunnel.paimon.filesystem.S3Loader
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
index 69ea9a9f74..71784966f8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
@@ -25,17 +25,32 @@
<artifactId>connector-paimon-e2e</artifactId>
<name>SeaTunnel : E2E : Connector V2 : Paimon</name>
+ <properties>
+ <testcontainer.version>1.19.1</testcontainer.version>
+ <minio.version>8.5.6</minio.version>
+ </properties>
+
<dependencies>
+ <!-- minio containers -->
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-fake</artifactId>
- <version>${project.version}</version>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>minio</artifactId>
+ <version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.minio</groupId>
+ <artifactId>minio</artifactId>
+ <version>${minio.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-paimon</artifactId>
+ <artifactId>connector-seatunnel-e2e-base</artifactId>
<version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
@@ -44,6 +59,18 @@
<classifier>optional</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-paimon</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
new file mode 100644
index 0000000000..2df1a5e49b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MinIOContainer;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+
+import java.nio.file.Paths;
+import java.util.Map;
+
+public class PaimonWithS3IT extends SeaTunnelContainer {
+
+ private static final String MINIO_DOCKER_IMAGE =
"minio/minio:RELEASE.2024-06-13T22-53-53Z";
+ private static final String HOST = "minio";
+ private static final int MINIO_PORT = 9000;
+ private static final String MINIO_USER_NAME = "minio";
+ private static final String MINIO_USER_PASSWORD = "miniominio";
+
+ private static final String BUCKET = "test";
+
+ private MinIOContainer container;
+ private MinioClient minioClient;
+
+ private Map<String, Object> PAIMON_SINK_PROPERTIES;
+
+ protected static final String AWS_SDK_DOWNLOAD =
+
"https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar";
+ protected static final String HADOOP_AWS_DOWNLOAD =
+
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar";
+
+ @Override
+ @BeforeAll
+ public void startUp() throws Exception {
+ container =
+ new MinIOContainer(MINIO_DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withUserName(MINIO_USER_NAME)
+ .withPassword(MINIO_USER_PASSWORD)
+ .withExposedPorts(MINIO_PORT);
+ container.start();
+
+ String s3URL = container.getS3URL();
+
+ // configuringClient
+ minioClient =
+ MinioClient.builder()
+ .endpoint(s3URL)
+ .credentials(container.getUserName(),
container.getPassword())
+ .build();
+
+ // create bucket
+
minioClient.makeBucket(MakeBucketArgs.builder().bucket(BUCKET).build());
+
+ BucketExistsArgs existsArgs =
BucketExistsArgs.builder().bucket(BUCKET).build();
+ Assertions.assertTrue(minioClient.bucketExists(existsArgs));
+ super.startUp();
+ }
+
+ @Override
+ @AfterAll
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (container != null) {
+ container.close();
+ }
+ }
+
+ @Override
+ protected String[] buildStartCommand() {
+ return new String[] {
+ "bash",
+ "-c",
+ "wget -P "
+ + SEATUNNEL_HOME
+ + "lib "
+ + AWS_SDK_DOWNLOAD
+ + " &&"
+ + "wget -P "
+ + SEATUNNEL_HOME
+ + "lib "
+ + HADOOP_AWS_DOWNLOAD
+ + " &&"
+ + ContainerUtil.adaptPathForWin(
+ Paths.get(SEATUNNEL_HOME, "bin",
SERVER_SHELL).toString())
+ };
+ }
+
+ @Override
+ protected boolean isIssueWeAlreadyKnow(String threadName) {
+ return super.isIssueWeAlreadyKnow(threadName)
+ // Paimon with s3
+ || threadName.startsWith("s3a-transfer");
+ }
+
+ @Test
+ public void testFaceCDCSinkPaimonWithS3Filesystem() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelJob("/fake_to_paimon_with_s3.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Container.ExecResult readResult =
executeSeaTunnelJob("/paimon_with_s3_to_assert.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
new file mode 100644
index 0000000000..a379a638eb
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "s3a://test/"
+ database = "seatunnel_namespace11"
+ table = "st_test"
+ paimon.hadoop.conf = {
+ fs.s3a.access-key=minio
+ fs.s3a.secret-key=miniominio
+ fs.s3a.endpoint="http://minio:9000"
+ fs.s3a.path.style.access=true
+
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
new file mode 100644
index 0000000000..6684b5fa95
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ Paimon {
+ warehouse = "s3a://test/"
+ database = "seatunnel_namespace11"
+ table = "st_test"
+ paimon.hadoop.conf = {
+ fs.s3a.access-key=minio
+ fs.s3a.secret-key=miniominio
+ fs.s3a.endpoint="http://minio:9000"
+ fs.s3a.path.style.access=true
+
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ }
+ ],
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 3
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 100
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 5fa5abb7ed..b9ff54b6c3 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -439,7 +439,7 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
}
/** The thread should be recycled but not, we should fix it in the future.
*/
- private boolean isIssueWeAlreadyKnow(String threadName) {
+ protected boolean isIssueWeAlreadyKnow(String threadName) {
// ClickHouse com.clickhouse.client.ClickHouseClientBuilder
return threadName.startsWith("ClickHouseClientWorker")
// InfluxDB okio.AsyncTimeout$Watchdog