This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4f3df22124 [Feature][CONNECTORS-V2-Paimon] Paimon Sink supported 
truncate table (#7560)
4f3df22124 is described below

commit 4f3df221247d57e52c75ebceeb0d60aa9c8db661
Author: zhangdonghao <[email protected]>
AuthorDate: Wed Sep 4 12:06:39 2024 +0800

    [Feature][CONNECTORS-V2-Paimon] Paimon Sink supported truncate table (#7560)
---
 .../seatunnel/paimon/catalog/PaimonCatalog.java    |  30 +++++
 .../e2e/connector/paimon/PaimonRecord.java         |   1 +
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      |  48 +++++++-
 .../e2e/connector/paimon/PaimonSinkHdfsIT.java     | 129 +++++++++++++++++++++
 .../fake_sink_paimon_truncate_with_hdfs_case1.conf |  80 +++++++++++++
 .../fake_sink_paimon_truncate_with_hdfs_case2.conf |  65 +++++++++++
 .../fake_sink_paimon_truncate_with_hive_case1.conf |  82 +++++++++++++
 .../fake_sink_paimon_truncate_with_hive_case2.conf |  67 +++++++++++
 ...fake_sink_paimon_truncate_with_local_case1.conf |  71 ++++++++++++
 ...fake_sink_paimon_truncate_with_local_case2.conf |  56 +++++++++
 10 files changed, 626 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index d896e01539..9e09035e2f 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -40,6 +40,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.Closeable;
@@ -183,6 +184,35 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
         }
     }
 
+    @Override
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            Identifier identifier = toIdentifier(tablePath);
+            FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier);
+            Schema schema = buildPaimonSchema(table.schema());
+            dropTable(tablePath, ignoreIfNotExists);
+            catalog.createTable(identifier, schema, ignoreIfNotExists);
+        } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+            throw new TableNotExistException(this.catalogName, tablePath);
+        } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException 
e) {
+            throw new DatabaseAlreadyExistException(this.catalogName, 
tablePath.getDatabaseName());
+        } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException 
e) {
+            throw new DatabaseNotExistException(this.catalogName, 
tablePath.getDatabaseName());
+        }
+    }
+
+    private Schema buildPaimonSchema(@NonNull 
org.apache.paimon.schema.TableSchema schema) {
+        Schema.Builder builder = Schema.newBuilder();
+        schema.fields()
+                .forEach(field -> builder.column(field.name(), field.type(), 
field.description()));
+        builder.options(schema.options());
+        builder.primaryKey(schema.primaryKeys());
+        builder.partitionKeys(schema.partitionKeys());
+        builder.comment(schema.comment());
+        return builder.build();
+    }
+
     @Override
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
index 13dcd3d675..700bf25f51 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecord.java
@@ -32,6 +32,7 @@ import lombok.NoArgsConstructor;
 public class PaimonRecord {
     public Long pkId;
     public String name;
+    public Integer score;
     public String dt;
     public Timestamp oneTime;
     public Timestamp twoTime;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 4b1d7dd86c..0168cc8f53 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -508,6 +508,43 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertEquals(0, readResult4.getExitCode());
     }
 
+    @TestTemplate
+    public void testSinkPaimonTruncateTable(TestContainer container) throws 
Exception {
+        Container.ExecResult writeResult =
+                
container.executeJob("/fake_sink_paimon_truncate_with_local_case1.conf");
+        Assertions.assertEquals(0, writeResult.getExitCode());
+        Container.ExecResult readResult =
+                
container.executeJob("/fake_sink_paimon_truncate_with_local_case2.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(30L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy paimon to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            List<PaimonRecord> paimonRecords =
+                                    loadPaimonData("seatunnel_namespace10", 
TARGET_TABLE);
+                            Assertions.assertEquals(2, paimonRecords.size());
+                            paimonRecords.forEach(
+                                    paimonRecord -> {
+                                        if (paimonRecord.getPkId() == 1) {
+                                            Assertions.assertEquals("Aa", 
paimonRecord.getName());
+                                        }
+                                        if (paimonRecord.getPkId() == 2) {
+                                            Assertions.assertEquals("Bb", 
paimonRecord.getName());
+                                        }
+                                        Assertions.assertEquals(200, 
paimonRecord.getScore());
+                                    });
+                            List<Long> ids =
+                                    paimonRecords.stream()
+                                            .map(PaimonRecord::getPkId)
+                                            .collect(Collectors.toList());
+                            Assertions.assertFalse(ids.contains(3L));
+                        });
+    }
+
     protected final ContainerExtendedFactory containerExtendedFactory =
             container -> {
                 if (isWindows) {
@@ -568,7 +605,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
     }
 
     private List<PaimonRecord> loadPaimonData(String dbName, String tbName) 
throws Exception {
-        Table table = getTable(dbName, tbName);
+        FileStoreTable table = (FileStoreTable) getTable(dbName, tbName);
         ReadBuilder readBuilder = table.newReadBuilder();
         TableScan.Plan plan = readBuilder.newScan().plan();
         TableRead tableRead = readBuilder.newRead();
@@ -582,7 +619,12 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
         try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
             reader.forEachRemaining(
                     row -> {
-                        result.add(new PaimonRecord(row.getLong(0), 
row.getString(1).toString()));
+                        PaimonRecord paimonRecord =
+                                new PaimonRecord(row.getLong(0), 
row.getString(1).toString());
+                        if (table.schema().fieldNames().contains("score")) {
+                            paimonRecord.setScore(row.getInt(2));
+                        }
+                        result.add(paimonRecord);
                         log.info("key_id:" + row.getLong(0) + ", name:" + 
row.getString(1));
                     });
         }
@@ -611,7 +653,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
     private Catalog getCatalog() {
         Options options = new Options();
         if (isWindows) {
-            options.set("warehouse", "file://" + CATALOG_DIR_WIN);
+            options.set("warehouse", CATALOG_DIR_WIN);
         } else {
             options.set("warehouse", "file://" + CATALOG_DIR);
         }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
index 259bc0128a..e93f45d913 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
@@ -34,6 +34,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableRead;
@@ -50,6 +51,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.awaitility.Awaitility.given;
 
@@ -204,4 +206,131 @@ public class PaimonSinkHdfsIT extends TestSuiteBase {
                 
container.executeJob("/paimon_to_assert_with_hivecatalog.conf");
         Assertions.assertEquals(0, readResult.getExitCode());
     }
+
+    @TestTemplate
+    public void testSinkPaimonHdfsTruncateTable(TestContainer container) 
throws Exception {
+        Container.ExecResult writeResult =
+                
container.executeJob("/fake_sink_paimon_truncate_with_hdfs_case1.conf");
+        Assertions.assertEquals(0, writeResult.getExitCode());
+        Container.ExecResult readResult =
+                
container.executeJob("/fake_sink_paimon_truncate_with_hdfs_case2.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(180L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            PaimonSinkConfig paimonSinkConfig =
+                                    new PaimonSinkConfig(
+                                            
ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
+                            PaimonCatalogLoader paimonCatalogLoader =
+                                    new PaimonCatalogLoader(paimonSinkConfig);
+                            Catalog catalog = 
paimonCatalogLoader.loadCatalog();
+                            List<PaimonRecord> paimonRecords =
+                                    loadPaimonData(catalog, 
"seatunnel_namespace11", "st_test");
+                            Assertions.assertEquals(2, paimonRecords.size());
+                            paimonRecords.forEach(
+                                    paimonRecord -> {
+                                        if (paimonRecord.getPkId() == 1) {
+                                            Assertions.assertEquals("Aa", 
paimonRecord.getName());
+                                        }
+                                        if (paimonRecord.getPkId() == 2) {
+                                            Assertions.assertEquals("Bb", 
paimonRecord.getName());
+                                        }
+                                        Assertions.assertEquals(200, 
paimonRecord.getScore());
+                                    });
+                            List<Long> ids =
+                                    paimonRecords.stream()
+                                            .map(PaimonRecord::getPkId)
+                                            .collect(Collectors.toList());
+                            Assertions.assertFalse(ids.contains(3L));
+                        });
+    }
+
+    @TestTemplate
+    public void testSinkPaimonHiveTruncateTable(TestContainer container) 
throws Exception {
+        Container.ExecResult writeResult =
+                
container.executeJob("/fake_sink_paimon_truncate_with_hive_case1.conf");
+        Assertions.assertEquals(0, writeResult.getExitCode());
+        Container.ExecResult readResult =
+                
container.executeJob("/fake_sink_paimon_truncate_with_hive_case2.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(180L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            PaimonSinkConfig paimonSinkConfig =
+                                    new PaimonSinkConfig(
+                                            
ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
+                            PaimonCatalogLoader paimonCatalogLoader =
+                                    new PaimonCatalogLoader(paimonSinkConfig);
+                            Catalog catalog = 
paimonCatalogLoader.loadCatalog();
+                            List<PaimonRecord> paimonRecords =
+                                    loadPaimonData(catalog, 
"seatunnel_namespace12", "st_test");
+                            Assertions.assertEquals(2, paimonRecords.size());
+                            paimonRecords.forEach(
+                                    paimonRecord -> {
+                                        if (paimonRecord.getPkId() == 1) {
+                                            Assertions.assertEquals("Aa", 
paimonRecord.getName());
+                                        }
+                                        if (paimonRecord.getPkId() == 2) {
+                                            Assertions.assertEquals("Bb", 
paimonRecord.getName());
+                                        }
+                                        Assertions.assertEquals(200, 
paimonRecord.getScore());
+                                    });
+                            List<Long> ids =
+                                    paimonRecords.stream()
+                                            .map(PaimonRecord::getPkId)
+                                            .collect(Collectors.toList());
+                            Assertions.assertFalse(ids.contains(3L));
+                        });
+    }
+
+    @TestTemplate
+    public void testSinkPaimonHiveTruncateTable1(TestContainer container) 
throws Exception {
+        PaimonSinkConfig paimonSinkConfig =
+                new 
PaimonSinkConfig(ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
+        PaimonCatalogLoader paimonCatalogLoader = new 
PaimonCatalogLoader(paimonSinkConfig);
+        Catalog catalog = paimonCatalogLoader.loadCatalog();
+        List<PaimonRecord> paimonRecords =
+                loadPaimonData(catalog, "seatunnel_namespace11", "st_test");
+        Assertions.assertEquals(2, paimonRecords.size());
+        paimonRecords.forEach(
+                paimonRecord -> {
+                    if (paimonRecord.getPkId() == 1) {
+                        Assertions.assertEquals("Aa", paimonRecord.getName());
+                    }
+                    if (paimonRecord.getPkId() == 2) {
+                        Assertions.assertEquals("Bb", paimonRecord.getName());
+                    }
+                    Assertions.assertEquals(200, paimonRecord.getScore());
+                });
+        List<Long> ids =
+                
paimonRecords.stream().map(PaimonRecord::getPkId).collect(Collectors.toList());
+        Assertions.assertFalse(ids.contains(3L));
+    }
+
+    private List<PaimonRecord> loadPaimonData(Catalog catalog, String dbName, 
String tbName)
+            throws Exception {
+        FileStoreTable table = (FileStoreTable) 
catalog.getTable(Identifier.create(dbName, tbName));
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        TableRead tableRead = readBuilder.newRead();
+        List<PaimonRecord> result = new ArrayList<>();
+        try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
+            reader.forEachRemaining(
+                    row -> {
+                        PaimonRecord paimonRecord =
+                                new PaimonRecord(row.getLong(0), 
row.getString(1).toString());
+                        if (table.schema().fieldNames().contains("score")) {
+                            paimonRecord.setScore(row.getInt(2));
+                        }
+                        result.add(paimonRecord);
+                    });
+        }
+        return result;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
new file mode 100644
index 0000000000..92f6f5c6de
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case1.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "A_1", 100]
+      },
+      {
+        kind = DELETE
+        fields = [2, "B", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "hdfs:///tmp/paimon"
+    database = "seatunnel_namespace11"
+    table = "st_test"
+    paimon.hadoop.conf = {
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
new file mode 100644
index 0000000000..1a5eac7322
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "Aa", 200]
+      },
+      {
+        kind = INSERT
+        fields = [2, "Bb", 200]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "hdfs:///tmp/paimon"
+    database = "seatunnel_namespace11"
+    table = "st_test"
+    data_save_mode=DROP_DATA
+    paimon.hadoop.conf = {
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
new file mode 100644
index 0000000000..26e95870e3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "A_1", 100]
+      },
+      {
+        kind = DELETE
+        fields = [2, "B", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "hdfs:///tmp/paimon"
+    catalog_type="hive"
+    catalog_uri="thrift://hadoop04:9083"
+    database = "seatunnel_namespace12"
+    table = "st_test"
+    paimon.hadoop.conf = {
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
new file mode 100644
index 0000000000..ef1e79b86e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "Aa", 200]
+      },
+      {
+        kind = INSERT
+        fields = [2, "Bb", 200]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "hdfs:///tmp/paimon"
+    catalog_type="hive"
+    catalog_uri="thrift://hadoop04:9083"
+    database = "seatunnel_namespace12"
+    table = "st_test"
+    data_save_mode=DROP_DATA
+    paimon.hadoop.conf = {
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
new file mode 100644
index 0000000000..e22474a06d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "A_1", 100]
+      },
+      {
+        kind = DELETE
+        fields = [2, "B", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "seatunnel_namespace10"
+    table = "st_test"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
new file mode 100644
index 0000000000..64cb24bc8e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "Aa", 200]
+      },
+      {
+        kind = INSERT
+        fields = [2, "Bb", 200]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "seatunnel_namespace10"
+    table = "st_test"
+    data_save_mode=DROP_DATA
+  }
+}

Reply via email to