This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c82b19ad06 [Improve][Connector-V2] Add branch option for paimon sink 
(#9982)
c82b19ad06 is described below

commit c82b19ad0605c8fe6a42fa412bfc6c1a562cd991
Author: xiaochen <[email protected]>
AuthorDate: Sun Dec 7 23:41:16 2025 +0800

    [Improve][Connector-V2] Add branch option for paimon sink (#9982)
---
 docs/en/connector-v2/sink/Paimon.md                |  1 +
 docs/zh/connector-v2/sink/Paimon.md                |  1 +
 .../paimon/catalog/PaimonCatalogFactory.java       |  3 +-
 .../seatunnel/paimon/config/PaimonSinkConfig.java  |  2 +
 .../seatunnel/paimon/config/PaimonSinkOptions.java |  3 +
 .../paimon/exception/PaimonConnectorErrorCode.java |  3 +-
 .../paimon/handler/PaimonSaveModeHandler.java      | 12 +++-
 .../seatunnel/paimon/sink/PaimonSink.java          | 40 ++++++++---
 .../seatunnel/paimon/sink/PaimonSinkFactory.java   |  1 +
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    | 14 ++++
 .../seatunnel/e2e/connector/paimon/PaimonIT.java   | 81 ++++++++++++++++++++++
 .../src/test/resources/fake_to_paimon_branch.conf  | 63 +++++++++++++++++
 12 files changed, 212 insertions(+), 12 deletions(-)

diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index 17c1328fde..f854a3be79 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -78,6 +78,7 @@ libfb303-xxx.jar
 | paimon.hadoop.conf           | Map     | No       | -                        
    | Properties in hadoop conf                                                 
                                                                                
       |
 | paimon.hadoop.conf-path      | String  | No       | -                        
    | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files                                                           
            |
 | paimon.table.non-primary-key | Boolean | false    | -                        
    | Switch to create `table with PK` or `table without PK`. true : `table 
without PK`, false : `table with PK`                                            
           |
+| branch                       | String  | No       | main                     
    | The branch name of Paimon table to write data to. If the branch does not 
exist, an exception will be thrown.                                             
        |
 
 
 ## Checkpoint in batch mode
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index 08de11bea5..34d3e8004d 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -77,6 +77,7 @@ libfb303-xxx.jar
 | paimon.hadoop.conf           | Map  | 否    | -                            | 
Hadoop配置文件属性信息                                                                  
                     |
 | paimon.hadoop.conf-path      | 字符串  | 否    | -                            | 
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置          
                     |
 | paimon.table.non-primary-key | Boolean | false    | -                        
    | 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表                              
                           |
+| branch                       | 字符串  | 否    | main                         | 
要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。                                             
                    |
 
 ## 批模式下的checkpoint
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
index 0a4cdb00fc..ffe6d2821e 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
@@ -55,7 +55,8 @@ public class PaimonCatalogFactory implements CatalogFactory {
                         PaimonSinkOptions.DATA_SAVE_MODE,
                         PaimonSinkOptions.PRIMARY_KEYS,
                         PaimonSinkOptions.PARTITION_KEYS,
-                        PaimonSinkOptions.WRITE_PROPS)
+                        PaimonSinkOptions.WRITE_PROPS,
+                        PaimonSinkOptions.BRANCH)
                 .conditional(
                         PaimonBaseOptions.CATALOG_TYPE,
                         PaimonCatalogEnum.HIVE,
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index d54a65b80b..0093cbd89e 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -40,6 +40,7 @@ public class PaimonSinkConfig extends PaimonConfig {
     private final DataSaveMode dataSaveMode;
     private final CoreOptions.ChangelogProducer changelogProducer;
     private final String changelogTmpPath;
+    private final String branch;
     private final Boolean nonPrimaryKey;
     private final List<String> primaryKeys;
     private final List<String> partitionKeys;
@@ -79,5 +80,6 @@ public class PaimonSinkConfig extends PaimonConfig {
         this.changelogTmpPath =
                 writeProps.getOrDefault(
                         PaimonSinkOptions.CHANGELOG_TMP_PATH, 
System.getProperty("java.io.tmpdir"));
+        this.branch = readonlyConfig.get(PaimonSinkOptions.BRANCH);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
index b56215e2d1..e1cd46e14e 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
@@ -68,4 +68,7 @@ public class PaimonSinkOptions extends PaimonBaseOptions {
                     .defaultValue(new HashMap<>())
                     .withDescription(
                             "Properties passed through to paimon table 
initialization, such as 'file.format', 
'bucket'(org.apache.paimon.CoreOptions)");
+
+    public static final Option<String> BRANCH =
+            
Options.key("branch").stringType().noDefaultValue().withDescription("branch");
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index 2f648b6f25..f369b8cf66 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -32,7 +32,8 @@ public enum PaimonConnectorErrorCode implements 
SeaTunnelErrorCode {
     WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in 
dynamic bucket mode"),
     NON_PRIMARY_KEY_CHECK_ERROR(
             "PAIMON-10", "Primary keys should be empty when nonPrimaryKey is 
true"),
-    DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is 
incompatible. ");
+    DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is 
incompatible. "),
+    BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. ");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
index 7e93ee3512..a89edc02c5 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.handler;
 
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
@@ -26,6 +28,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
 import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;
 
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
 public class PaimonSaveModeHandler extends DefaultSaveModeHandler {
@@ -33,6 +36,7 @@ public class PaimonSaveModeHandler extends 
DefaultSaveModeHandler {
     private SupportLoadTable<Table> supportLoadTable;
     private Catalog catalog;
     private CatalogTable catalogTable;
+    private String branch;
 
     public PaimonSaveModeHandler(
             SupportLoadTable supportLoadTable,
@@ -40,11 +44,13 @@ public class PaimonSaveModeHandler extends 
DefaultSaveModeHandler {
             DataSaveMode dataSaveMode,
             Catalog catalog,
             CatalogTable catalogTable,
-            String customSql) {
+            String customSql,
+            String branch) {
         super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
         this.supportLoadTable = supportLoadTable;
         this.catalog = catalog;
         this.catalogTable = catalogTable;
+        this.branch = branch;
     }
 
     @Override
@@ -52,9 +58,11 @@ public class PaimonSaveModeHandler extends 
DefaultSaveModeHandler {
         super.handleSchemaSaveMode();
         TablePath tablePath = catalogTable.getTablePath();
         Table paimonTable = ((PaimonCatalog) 
catalog).getPaimonTable(tablePath);
-        // load paimon table and set it into paimon sink
         Table loadTable = this.supportLoadTable.getLoadTable();
         if (loadTable == null || this.schemaSaveMode == 
SchemaSaveMode.RECREATE_SCHEMA) {
+            if (StringUtils.isNotEmpty(branch)) {
+                paimonTable = ((FileStoreTable) 
paimonTable).switchToBranch(branch);
+            }
             this.supportLoadTable.setLoadTable(paimonTable);
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 5b185004c6..a6744fc593 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
 
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
@@ -35,6 +37,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
@@ -43,7 +47,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggreg
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
 
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.BranchManager;
+
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -51,6 +59,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
 
+@Slf4j
 public class PaimonSink
         implements SeaTunnelSink<
                         SeaTunnelRow,
@@ -66,7 +75,7 @@ public class PaimonSink
 
     public static final String PLUGIN_NAME = "Paimon";
 
-    private Table paimonTable;
+    private FileStoreTable paimonTable;
 
     private JobContext jobContext;
 
@@ -92,11 +101,25 @@ public class PaimonSink
             paimonCatalog.open();
             boolean databaseExists =
                     
paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace());
-            if (databaseExists) {
-                TablePath tablePath = catalogTable.getTablePath();
-                boolean tableExists = paimonCatalog.tableExists(tablePath);
-                if (tableExists) {
-                    this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
+            if (!databaseExists) {
+                return;
+            }
+            TablePath tablePath = catalogTable.getTablePath();
+            boolean tableExists = paimonCatalog.tableExists(tablePath);
+            if (!tableExists) {
+                return;
+            }
+            this.paimonTable = (FileStoreTable) 
paimonCatalog.getPaimonTable(tablePath);
+            String branchName = paimonSinkConfig.getBranch();
+            if (StringUtils.isNotEmpty(branchName)) {
+                BranchManager branchManager = paimonTable.branchManager();
+                if (!branchManager.branchExists(branchName)) {
+                    throw new PaimonConnectorException(
+                            PaimonConnectorErrorCode.BRANCH_NOT_EXISTS, 
branchName);
+                }
+                if 
(!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
+                    this.paimonTable = paimonTable.switchToBranch(branchName);
+                    log.info("Switch to branch {}", branchName);
                 }
             }
         }
@@ -168,12 +191,13 @@ public class PaimonSink
                         paimonSinkConfig.getDataSaveMode(),
                         paimonCatalog,
                         catalogTable,
-                        null));
+                        null,
+                        paimonSinkConfig.getBranch()));
     }
 
     @Override
     public void setLoadTable(Table table) {
-        this.paimonTable = table;
+        this.paimonTable = (FileStoreTable) table;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
index c71641a037..b316cdc09a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
@@ -59,6 +59,7 @@ public class PaimonSinkFactory implements TableSinkFactory {
                         PaimonSinkOptions.PRIMARY_KEYS,
                         PaimonSinkOptions.PARTITION_KEYS,
                         PaimonSinkOptions.WRITE_PROPS,
+                        PaimonSinkOptions.BRANCH,
                         SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .conditional(
                         PaimonSinkOptions.CATALOG_TYPE,
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 4a20d5d04c..2d8fad38c0 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
 
 import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -58,6 +59,7 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.utils.BranchManager;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -274,6 +276,18 @@ public class PaimonSinkWriter
     private void reOpenTableWrite() {
         this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
         this.paimonTable = (FileStoreTable) 
paimonCatalog.getPaimonTable(paimonTablePath);
+        String branchName = paimonSinkConfig.getBranch();
+        if (StringUtils.isNotEmpty(branchName)) {
+            BranchManager branchManager = paimonTable.branchManager();
+            if (!branchManager.branchExists(branchName)) {
+                throw new PaimonConnectorException(
+                        PaimonConnectorErrorCode.BRANCH_NOT_EXISTS, 
branchName);
+            }
+            if 
(!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
+                this.paimonTable = this.paimonTable.switchToBranch(branchName);
+                log.info("Re-switched to branch {} after reopening table", 
branchName);
+            }
+        }
         this.sinkPaimonTableSchema = this.paimonTable.schema();
         this.newTableWrite();
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index d5ff43c10b..129200e413 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonBaseOptions
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -29,14 +30,24 @@ import 
org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.privilege.FileBasedPrivilegeManagerLoader;
 import org.apache.paimon.privilege.PrivilegeType;
 import org.apache.paimon.privilege.PrivilegedCatalog;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -64,6 +75,17 @@ public class PaimonIT extends TestSuiteBase implements 
TestResource {
     private final String DATABASE_NAME = "default";
     private final String TABLE_NAME = "st_test_p";
 
+    private static final String NAMESPACE = "paimon";
+    protected static String hostName = System.getProperty("user.name");
+    protected static final String CONTAINER_VOLUME_MOUNT_PATH = 
"/tmp/seatunnel_mnt";
+
+    protected static final boolean isWindows =
+            
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+    public static final String HOST_VOLUME_MOUNT_PATH =
+            isWindows
+                    ? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName)
+                    : CONTAINER_VOLUME_MOUNT_PATH;
+
     @TestContainerExtension
     private final ContainerExtendedFactory extendedFactory =
             container -> {
@@ -204,4 +226,63 @@ public class PaimonIT extends TestSuiteBase implements 
TestResource {
         List<File> files = FileUtils.listFile(tmpDir);
         Assertions.assertTrue(CollectionUtils.isEmpty(files));
     }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "Spark and Flink engine can not auto create paimon table 
on worker node in local file(e.g flink tm) by savemode feature which can lead 
error")
+    @TestTemplate
+    public void testSinkBranch(TestContainer container) throws Exception {
+
+        String testBranchName = "test_branch";
+        FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, 
TABLE_NAME);
+        List<String> branches = table.branchManager().branches();
+        if (!branches.contains(testBranchName)) {
+            table.createBranch(testBranchName);
+        }
+        Container.ExecResult textWriteResult = 
container.executeJob("/fake_to_paimon_branch.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        long rowCount = getTableRowCount(table);
+        Assertions.assertEquals(0, rowCount);
+
+        FileStoreTable fileStoreTableWithBranch = 
table.switchToBranch(testBranchName);
+        rowCount = getTableRowCount(fileStoreTableWithBranch);
+        Assertions.assertEquals(10001, rowCount);
+    }
+
+    private Table getTable(String dbName, String tbName) {
+        Options options = new Options();
+        String warehouse =
+                String.format(
+                        "%s%s/%s", isWindows ? "" : "file://", 
HOST_VOLUME_MOUNT_PATH, NAMESPACE);
+        options.set("warehouse", warehouse);
+        try {
+            Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(options));
+            return catalog.getTable(Identifier.create(dbName, tbName));
+        } catch (Catalog.TableNotExistException e) {
+            throw new RuntimeException("table not exist");
+        }
+    }
+
+    private long getTableRowCount(FileStoreTable table) {
+        try {
+            ReadBuilder readBuilder = table.newReadBuilder();
+            TableScan.Plan plan = readBuilder.newScan().plan();
+            TableRead tableRead = readBuilder.newRead();
+
+            long count = 0;
+            try (RecordReader<InternalRow> reader = 
tableRead.createReader(plan);
+                    RecordReaderIterator<InternalRow> iterator =
+                            new RecordReaderIterator<>(reader)) {
+                while (iterator.hasNext()) {
+                    iterator.next();
+                    count++;
+                }
+            }
+            return count;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to read data count from table", 
e);
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_branch.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_branch.conf
new file mode 100644
index 0000000000..ddff49d357
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_branch.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    auto.increment.enabled = true
+    auto.increment.start = 1
+    row.num = 10001
+    schema = {
+      fields {
+        pk_id = bigint
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+        c_time = time
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    plugin_output = "fake"
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    database = "default"
+    table = "st_test_p"
+    branch = "test_branch"
+  }
+}

Reply via email to