This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4f3df22124 [Feature][CONNECTORS-V2-Paimon] Paimon Sink supported
truncate table (#7560)
4f3df22124 is described below
commit 4f3df221247d57e52c75ebceeb0d60aa9c8db661
Author: zhangdonghao <[email protected]>
AuthorDate: Wed Sep 4 12:06:39 2024 +0800
[Feature][CONNECTORS-V2-Paimon] Paimon Sink supported truncate table (#7560)
---
.../seatunnel/paimon/catalog/PaimonCatalog.java | 30 +++++
.../e2e/connector/paimon/PaimonRecord.java | 1 +
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 48 +++++++-
.../e2e/connector/paimon/PaimonSinkHdfsIT.java | 129 +++++++++++++++++++++
.../fake_sink_paimon_truncate_with_hdfs_case1.conf | 80 +++++++++++++
.../fake_sink_paimon_truncate_with_hdfs_case2.conf | 65 +++++++++++
.../fake_sink_paimon_truncate_with_hive_case1.conf | 82 +++++++++++++
.../fake_sink_paimon_truncate_with_hive_case2.conf | 67 +++++++++++
...fake_sink_paimon_truncate_with_local_case1.conf | 71 ++++++++++++
...fake_sink_paimon_truncate_with_local_case2.conf | 56 +++++++++
10 files changed, 626 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index d896e01539..9e09035e2f 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -40,6 +40,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
@@ -183,6 +184,35 @@ public class PaimonCatalog implements Catalog, PaimonTable
{
}
}
+ @Override
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ Identifier identifier = toIdentifier(tablePath);
+ FileStoreTable table = (FileStoreTable)
catalog.getTable(identifier);
+ Schema schema = buildPaimonSchema(table.schema());
+ dropTable(tablePath, ignoreIfNotExists);
+ catalog.createTable(identifier, schema, ignoreIfNotExists);
+ } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+ throw new TableNotExistException(this.catalogName, tablePath);
+ } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException
e) {
+ throw new DatabaseAlreadyExistException(this.catalogName,
tablePath.getDatabaseName());
+ } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException
e) {
+ throw new DatabaseNotExistException(this.catalogName,
tablePath.getDatabaseName());
+ }
+ }
+
+ private Schema buildPaimonSchema(@NonNull
org.apache.paimon.schema.TableSchema schema) {
+ Schema.Builder builder = Schema.newBuilder();
+ schema.fields()
+ .forEach(field -> builder.column(field.name(), field.type(),
field.description()));
+ builder.options(schema.options());
+ builder.primaryKey(schema.primaryKeys());
+ builder.partitionKeys(schema.partitionKeys());
+ builder.comment(schema.comment());
+ return builder.build();
+ }
+
@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
index 13dcd3d675..700bf25f51 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
@@ -32,6 +32,7 @@ import lombok.NoArgsConstructor;
public class PaimonRecord {
public Long pkId;
public String name;
+ public Integer score;
public String dt;
public Timestamp oneTime;
public Timestamp twoTime;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 4b1d7dd86c..0168cc8f53 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -508,6 +508,43 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
Assertions.assertEquals(0, readResult4.getExitCode());
}
+ @TestTemplate
+ public void testSinkPaimonTruncateTable(TestContainer container) throws
Exception {
+ Container.ExecResult writeResult =
+
container.executeJob("/fake_sink_paimon_truncate_with_local_case1.conf");
+ Assertions.assertEquals(0, writeResult.getExitCode());
+ Container.ExecResult readResult =
+
container.executeJob("/fake_sink_paimon_truncate_with_local_case2.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(30L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ // copy paimon to local
+
container.executeExtraCommands(containerExtendedFactory);
+ List<PaimonRecord> paimonRecords =
+ loadPaimonData("seatunnel_namespace10",
TARGET_TABLE);
+ Assertions.assertEquals(2, paimonRecords.size());
+ paimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
+ Assertions.assertEquals("Aa",
paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 2) {
+ Assertions.assertEquals("Bb",
paimonRecord.getName());
+ }
+ Assertions.assertEquals(200,
paimonRecord.getScore());
+ });
+ List<Long> ids =
+ paimonRecords.stream()
+ .map(PaimonRecord::getPkId)
+ .collect(Collectors.toList());
+ Assertions.assertFalse(ids.contains(3L));
+ });
+ }
+
protected final ContainerExtendedFactory containerExtendedFactory =
container -> {
if (isWindows) {
@@ -568,7 +605,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
}
private List<PaimonRecord> loadPaimonData(String dbName, String tbName)
throws Exception {
- Table table = getTable(dbName, tbName);
+ FileStoreTable table = (FileStoreTable) getTable(dbName, tbName);
ReadBuilder readBuilder = table.newReadBuilder();
TableScan.Plan plan = readBuilder.newScan().plan();
TableRead tableRead = readBuilder.newRead();
@@ -582,7 +619,12 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
reader.forEachRemaining(
row -> {
- result.add(new PaimonRecord(row.getLong(0),
row.getString(1).toString()));
+ PaimonRecord paimonRecord =
+ new PaimonRecord(row.getLong(0),
row.getString(1).toString());
+ if (table.schema().fieldNames().contains("score")) {
+ paimonRecord.setScore(row.getInt(2));
+ }
+ result.add(paimonRecord);
log.info("key_id:" + row.getLong(0) + ", name:" +
row.getString(1));
});
}
@@ -611,7 +653,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
private Catalog getCatalog() {
Options options = new Options();
if (isWindows) {
- options.set("warehouse", "file://" + CATALOG_DIR_WIN);
+ options.set("warehouse", CATALOG_DIR_WIN);
} else {
options.set("warehouse", "file://" + CATALOG_DIR);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
index 259bc0128a..e93f45d913 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
@@ -34,6 +34,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
@@ -50,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.awaitility.Awaitility.given;
@@ -204,4 +206,131 @@ public class PaimonSinkHdfsIT extends TestSuiteBase {
container.executeJob("/paimon_to_assert_with_hivecatalog.conf");
Assertions.assertEquals(0, readResult.getExitCode());
}
+
+ @TestTemplate
+ public void testSinkPaimonHdfsTruncateTable(TestContainer container)
throws Exception {
+ Container.ExecResult writeResult =
+
container.executeJob("/fake_sink_paimon_truncate_with_hdfs_case1.conf");
+ Assertions.assertEquals(0, writeResult.getExitCode());
+ Container.ExecResult readResult =
+
container.executeJob("/fake_sink_paimon_truncate_with_hdfs_case2.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(180L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ PaimonSinkConfig paimonSinkConfig =
+ new PaimonSinkConfig(
+
ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
+ PaimonCatalogLoader paimonCatalogLoader =
+ new PaimonCatalogLoader(paimonSinkConfig);
+ Catalog catalog =
paimonCatalogLoader.loadCatalog();
+ List<PaimonRecord> paimonRecords =
+ loadPaimonData(catalog,
"seatunnel_namespace11", "st_test");
+ Assertions.assertEquals(2, paimonRecords.size());
+ paimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
+ Assertions.assertEquals("Aa",
paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 2) {
+ Assertions.assertEquals("Bb",
paimonRecord.getName());
+ }
+ Assertions.assertEquals(200,
paimonRecord.getScore());
+ });
+ List<Long> ids =
+ paimonRecords.stream()
+ .map(PaimonRecord::getPkId)
+ .collect(Collectors.toList());
+ Assertions.assertFalse(ids.contains(3L));
+ });
+ }
+
+ @TestTemplate
+ public void testSinkPaimonHiveTruncateTable(TestContainer container)
throws Exception {
+ Container.ExecResult writeResult =
+
container.executeJob("/fake_sink_paimon_truncate_with_hive_case1.conf");
+ Assertions.assertEquals(0, writeResult.getExitCode());
+ Container.ExecResult readResult =
+
container.executeJob("/fake_sink_paimon_truncate_with_hive_case2.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(180L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ PaimonSinkConfig paimonSinkConfig =
+ new PaimonSinkConfig(
+
ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
+ PaimonCatalogLoader paimonCatalogLoader =
+ new PaimonCatalogLoader(paimonSinkConfig);
+ Catalog catalog =
paimonCatalogLoader.loadCatalog();
+ List<PaimonRecord> paimonRecords =
+ loadPaimonData(catalog,
"seatunnel_namespace12", "st_test");
+ Assertions.assertEquals(2, paimonRecords.size());
+ paimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
+ Assertions.assertEquals("Aa",
paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 2) {
+ Assertions.assertEquals("Bb",
paimonRecord.getName());
+ }
+ Assertions.assertEquals(200,
paimonRecord.getScore());
+ });
+ List<Long> ids =
+ paimonRecords.stream()
+ .map(PaimonRecord::getPkId)
+ .collect(Collectors.toList());
+ Assertions.assertFalse(ids.contains(3L));
+ });
+ }
+
+ @TestTemplate
+ public void testSinkPaimonHiveTruncateTable1(TestContainer container)
throws Exception {
+ PaimonSinkConfig paimonSinkConfig =
+ new
PaimonSinkConfig(ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
+ PaimonCatalogLoader paimonCatalogLoader = new
PaimonCatalogLoader(paimonSinkConfig);
+ Catalog catalog = paimonCatalogLoader.loadCatalog();
+ List<PaimonRecord> paimonRecords =
+ loadPaimonData(catalog, "seatunnel_namespace11", "st_test");
+ Assertions.assertEquals(2, paimonRecords.size());
+ paimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
+ Assertions.assertEquals("Aa", paimonRecord.getName());
+ }
+ if (paimonRecord.getPkId() == 2) {
+ Assertions.assertEquals("Bb", paimonRecord.getName());
+ }
+ Assertions.assertEquals(200, paimonRecord.getScore());
+ });
+ List<Long> ids =
+
paimonRecords.stream().map(PaimonRecord::getPkId).collect(Collectors.toList());
+ Assertions.assertFalse(ids.contains(3L));
+ }
+
+ private List<PaimonRecord> loadPaimonData(Catalog catalog, String dbName,
String tbName)
+ throws Exception {
+ FileStoreTable table = (FileStoreTable)
catalog.getTable(Identifier.create(dbName, tbName));
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ TableRead tableRead = readBuilder.newRead();
+ List<PaimonRecord> result = new ArrayList<>();
+ try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
+ reader.forEachRemaining(
+ row -> {
+ PaimonRecord paimonRecord =
+ new PaimonRecord(row.getLong(0),
row.getString(1).toString());
+ if (table.schema().fieldNames().contains("score")) {
+ paimonRecord.setScore(row.getInt(2));
+ }
+ result.add(paimonRecord);
+ });
+ }
+ return result;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
new file mode 100644
index 0000000000..92f6f5c6de
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "hdfs:///tmp/paimon"
+ database = "seatunnel_namespace11"
+ table = "st_test"
+ paimon.hadoop.conf = {
+ fs.defaultFS = "hdfs://nameservice1"
+ dfs.nameservices = "nameservice1"
+ dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+ dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+ dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+ dfs.client.failover.proxy.provider.nameservice1 =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ dfs.client.use.datanode.hostname = "true"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
new file mode 100644
index 0000000000..1a5eac7322
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "Aa", 200]
+ },
+ {
+ kind = INSERT
+ fields = [2, "Bb", 200]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "hdfs:///tmp/paimon"
+ database = "seatunnel_namespace11"
+ table = "st_test"
+ data_save_mode=DROP_DATA
+ paimon.hadoop.conf = {
+ fs.defaultFS = "hdfs://nameservice1"
+ dfs.nameservices = "nameservice1"
+ dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+ dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+ dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+ dfs.client.failover.proxy.provider.nameservice1 =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ dfs.client.use.datanode.hostname = "true"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
new file mode 100644
index 0000000000..26e95870e3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "hdfs:///tmp/paimon"
+ catalog_type="hive"
+ catalog_uri="thrift://hadoop04:9083"
+ database = "seatunnel_namespace12"
+ table = "st_test"
+ paimon.hadoop.conf = {
+ fs.defaultFS = "hdfs://nameservice1"
+ dfs.nameservices = "nameservice1"
+ dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+ dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+ dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+ dfs.client.failover.proxy.provider.nameservice1 =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ dfs.client.use.datanode.hostname = "true"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
new file mode 100644
index 0000000000..ef1e79b86e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "Aa", 200]
+ },
+ {
+ kind = INSERT
+ fields = [2, "Bb", 200]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "hdfs:///tmp/paimon"
+ catalog_type="hive"
+ catalog_uri="thrift://hadoop04:9083"
+ database = "seatunnel_namespace12"
+ table = "st_test"
+ data_save_mode=DROP_DATA
+ paimon.hadoop.conf = {
+ fs.defaultFS = "hdfs://nameservice1"
+ dfs.nameservices = "nameservice1"
+ dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+ dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+ dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+ dfs.client.failover.proxy.provider.nameservice1 =
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ dfs.client.use.datanode.hostname = "true"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
new file mode 100644
index 0000000000..e22474a06d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "seatunnel_namespace10"
+ table = "st_test"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
new file mode 100644
index 0000000000..64cb24bc8e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "Aa", 200]
+ },
+ {
+ kind = INSERT
+ fields = [2, "Bb", 200]
+ }
+ ]
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "seatunnel_namespace10"
+ table = "st_test"
+ data_save_mode=DROP_DATA
+ }
+}