This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c82b19ad06 [Improve][Connector-V2] Add branch option for paimon sink
(#9982)
c82b19ad06 is described below
commit c82b19ad0605c8fe6a42fa412bfc6c1a562cd991
Author: xiaochen <[email protected]>
AuthorDate: Sun Dec 7 23:41:16 2025 +0800
[Improve][Connector-V2] Add branch option for paimon sink (#9982)
---
docs/en/connector-v2/sink/Paimon.md | 1 +
docs/zh/connector-v2/sink/Paimon.md | 1 +
.../paimon/catalog/PaimonCatalogFactory.java | 3 +-
.../seatunnel/paimon/config/PaimonSinkConfig.java | 2 +
.../seatunnel/paimon/config/PaimonSinkOptions.java | 3 +
.../paimon/exception/PaimonConnectorErrorCode.java | 3 +-
.../paimon/handler/PaimonSaveModeHandler.java | 12 +++-
.../seatunnel/paimon/sink/PaimonSink.java | 40 ++++++++---
.../seatunnel/paimon/sink/PaimonSinkFactory.java | 1 +
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 14 ++++
.../seatunnel/e2e/connector/paimon/PaimonIT.java | 81 ++++++++++++++++++++++
.../src/test/resources/fake_to_paimon_branch.conf | 63 +++++++++++++++++
12 files changed, 212 insertions(+), 12 deletions(-)
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index 17c1328fde..f854a3be79 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -78,6 +78,7 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | No | -
| Properties in hadoop conf
|
| paimon.hadoop.conf-path | String | No | -
| The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
|
| paimon.table.non-primary-key | Boolean | false | -
| Switch to create `table with PK` or `table without PK`. true : `table
without PK`, false : `table with PK`
|
+| branch | String | No | main
| The branch name of Paimon table to write data to. If the branch does not
exist, an exception will be thrown.
|
## Checkpoint in batch mode
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index 08de11bea5..34d3e8004d 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -77,6 +77,7 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | 否 | - |
Hadoop配置文件属性信息
|
| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
| paimon.table.non-primary-key | Boolean | false | -
| 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表
|
+| branch | 字符串 | 否 | main |
要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。
|
## 批模式下的checkpoint
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
index 0a4cdb00fc..ffe6d2821e 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java
@@ -55,7 +55,8 @@ public class PaimonCatalogFactory implements CatalogFactory {
PaimonSinkOptions.DATA_SAVE_MODE,
PaimonSinkOptions.PRIMARY_KEYS,
PaimonSinkOptions.PARTITION_KEYS,
- PaimonSinkOptions.WRITE_PROPS)
+ PaimonSinkOptions.WRITE_PROPS,
+ PaimonSinkOptions.BRANCH)
.conditional(
PaimonBaseOptions.CATALOG_TYPE,
PaimonCatalogEnum.HIVE,
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index d54a65b80b..0093cbd89e 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -40,6 +40,7 @@ public class PaimonSinkConfig extends PaimonConfig {
private final DataSaveMode dataSaveMode;
private final CoreOptions.ChangelogProducer changelogProducer;
private final String changelogTmpPath;
+ private final String branch;
private final Boolean nonPrimaryKey;
private final List<String> primaryKeys;
private final List<String> partitionKeys;
@@ -79,5 +80,6 @@ public class PaimonSinkConfig extends PaimonConfig {
this.changelogTmpPath =
writeProps.getOrDefault(
PaimonSinkOptions.CHANGELOG_TMP_PATH,
System.getProperty("java.io.tmpdir"));
+ this.branch = readonlyConfig.get(PaimonSinkOptions.BRANCH);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
index b56215e2d1..e1cd46e14e 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java
@@ -68,4 +68,7 @@ public class PaimonSinkOptions extends PaimonBaseOptions {
.defaultValue(new HashMap<>())
.withDescription(
"Properties passed through to paimon table
initialization, such as 'file.format',
'bucket'(org.apache.paimon.CoreOptions)");
+
+ public static final Option<String> BRANCH =
+
Options.key("branch").stringType().noDefaultValue().withDescription("branch");
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index 2f648b6f25..f369b8cf66 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -32,7 +32,8 @@ public enum PaimonConnectorErrorCode implements
SeaTunnelErrorCode {
WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in
dynamic bucket mode"),
NON_PRIMARY_KEY_CHECK_ERROR(
"PAIMON-10", "Primary keys should be empty when nonPrimaryKey is
true"),
- DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is
incompatible. ");
+ DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is
incompatible. "),
+ BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. ");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
index 7e93ee3512..a89edc02c5 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.handler;
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
@@ -26,6 +28,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
public class PaimonSaveModeHandler extends DefaultSaveModeHandler {
@@ -33,6 +36,7 @@ public class PaimonSaveModeHandler extends
DefaultSaveModeHandler {
private SupportLoadTable<Table> supportLoadTable;
private Catalog catalog;
private CatalogTable catalogTable;
+ private String branch;
public PaimonSaveModeHandler(
SupportLoadTable supportLoadTable,
@@ -40,11 +44,13 @@ public class PaimonSaveModeHandler extends
DefaultSaveModeHandler {
DataSaveMode dataSaveMode,
Catalog catalog,
CatalogTable catalogTable,
- String customSql) {
+ String customSql,
+ String branch) {
super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql);
this.supportLoadTable = supportLoadTable;
this.catalog = catalog;
this.catalogTable = catalogTable;
+ this.branch = branch;
}
@Override
@@ -52,9 +58,11 @@ public class PaimonSaveModeHandler extends
DefaultSaveModeHandler {
super.handleSchemaSaveMode();
TablePath tablePath = catalogTable.getTablePath();
Table paimonTable = ((PaimonCatalog)
catalog).getPaimonTable(tablePath);
- // load paimon table and set it into paimon sink
Table loadTable = this.supportLoadTable.getLoadTable();
if (loadTable == null || this.schemaSaveMode ==
SchemaSaveMode.RECREATE_SCHEMA) {
+ if (StringUtils.isNotEmpty(branch)) {
+ paimonTable = ((FileStoreTable)
paimonTable).switchToBranch(branch);
+ }
this.supportLoadTable.setLoadTable(paimonTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 5b185004c6..a6744fc593 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
@@ -35,6 +37,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler;
import
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
@@ -43,7 +47,11 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggreg
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.BranchManager;
+
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Arrays;
@@ -51,6 +59,7 @@ import java.util.List;
import java.util.Optional;
import java.util.UUID;
+@Slf4j
public class PaimonSink
implements SeaTunnelSink<
SeaTunnelRow,
@@ -66,7 +75,7 @@ public class PaimonSink
public static final String PLUGIN_NAME = "Paimon";
- private Table paimonTable;
+ private FileStoreTable paimonTable;
private JobContext jobContext;
@@ -92,11 +101,25 @@ public class PaimonSink
paimonCatalog.open();
boolean databaseExists =
paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace());
- if (databaseExists) {
- TablePath tablePath = catalogTable.getTablePath();
- boolean tableExists = paimonCatalog.tableExists(tablePath);
- if (tableExists) {
- this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
+ if (!databaseExists) {
+ return;
+ }
+ TablePath tablePath = catalogTable.getTablePath();
+ boolean tableExists = paimonCatalog.tableExists(tablePath);
+ if (!tableExists) {
+ return;
+ }
+ this.paimonTable = (FileStoreTable)
paimonCatalog.getPaimonTable(tablePath);
+ String branchName = paimonSinkConfig.getBranch();
+ if (StringUtils.isNotEmpty(branchName)) {
+ BranchManager branchManager = paimonTable.branchManager();
+ if (!branchManager.branchExists(branchName)) {
+ throw new PaimonConnectorException(
+ PaimonConnectorErrorCode.BRANCH_NOT_EXISTS,
branchName);
+ }
+ if
(!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
+ this.paimonTable = paimonTable.switchToBranch(branchName);
+ log.info("Switch to branch {}", branchName);
}
}
}
@@ -168,12 +191,13 @@ public class PaimonSink
paimonSinkConfig.getDataSaveMode(),
paimonCatalog,
catalogTable,
- null));
+ null,
+ paimonSinkConfig.getBranch()));
}
@Override
public void setLoadTable(Table table) {
- this.paimonTable = table;
+ this.paimonTable = (FileStoreTable) table;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
index c71641a037..b316cdc09a 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java
@@ -59,6 +59,7 @@ public class PaimonSinkFactory implements TableSinkFactory {
PaimonSinkOptions.PRIMARY_KEYS,
PaimonSinkOptions.PARTITION_KEYS,
PaimonSinkOptions.WRITE_PROPS,
+ PaimonSinkOptions.BRANCH,
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
PaimonSinkOptions.CATALOG_TYPE,
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 4a20d5d04c..2d8fad38c0 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -58,6 +59,7 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.utils.BranchManager;
import lombok.extern.slf4j.Slf4j;
@@ -274,6 +276,18 @@ public class PaimonSinkWriter
private void reOpenTableWrite() {
this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
this.paimonTable = (FileStoreTable)
paimonCatalog.getPaimonTable(paimonTablePath);
+ String branchName = paimonSinkConfig.getBranch();
+ if (StringUtils.isNotEmpty(branchName)) {
+ BranchManager branchManager = paimonTable.branchManager();
+ if (!branchManager.branchExists(branchName)) {
+ throw new PaimonConnectorException(
+ PaimonConnectorErrorCode.BRANCH_NOT_EXISTS,
branchName);
+ }
+ if
(!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
+ this.paimonTable = this.paimonTable.switchToBranch(branchName);
+ log.info("Re-switched to branch {} after reopening table",
branchName);
+ }
+ }
this.sinkPaimonTableSchema = this.paimonTable.schema();
this.newTableWrite();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index d5ff43c10b..129200e413 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonBaseOptions
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -29,14 +30,24 @@ import
org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.options.Options;
import org.apache.paimon.privilege.FileBasedPrivilegeManagerLoader;
import org.apache.paimon.privilege.PrivilegeType;
import org.apache.paimon.privilege.PrivilegedCatalog;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -64,6 +75,17 @@ public class PaimonIT extends TestSuiteBase implements
TestResource {
private final String DATABASE_NAME = "default";
private final String TABLE_NAME = "st_test_p";
+ private static final String NAMESPACE = "paimon";
+ protected static String hostName = System.getProperty("user.name");
+ protected static final String CONTAINER_VOLUME_MOUNT_PATH =
"/tmp/seatunnel_mnt";
+
+ protected static final boolean isWindows =
+
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+ public static final String HOST_VOLUME_MOUNT_PATH =
+ isWindows
+ ? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName)
+ : CONTAINER_VOLUME_MOUNT_PATH;
+
@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
@@ -204,4 +226,63 @@ public class PaimonIT extends TestSuiteBase implements
TestResource {
List<File> files = FileUtils.listFile(tmpDir);
Assertions.assertTrue(CollectionUtils.isEmpty(files));
}
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "Spark and Flink engine can not auto create paimon table
on worker node in local file(e.g flink tm) by savemode feature which can lead
error")
+ @TestTemplate
+ public void testSinkBranch(TestContainer container) throws Exception {
+
+ String testBranchName = "test_branch";
+ FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME,
TABLE_NAME);
+ List<String> branches = table.branchManager().branches();
+ if (!branches.contains(testBranchName)) {
+ table.createBranch(testBranchName);
+ }
+ Container.ExecResult textWriteResult =
container.executeJob("/fake_to_paimon_branch.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ long rowCount = getTableRowCount(table);
+ Assertions.assertEquals(0, rowCount);
+
+ FileStoreTable fileStoreTableWithBranch =
table.switchToBranch(testBranchName);
+ rowCount = getTableRowCount(fileStoreTableWithBranch);
+ Assertions.assertEquals(10001, rowCount);
+ }
+
+ private Table getTable(String dbName, String tbName) {
+ Options options = new Options();
+ String warehouse =
+ String.format(
+ "%s%s/%s", isWindows ? "" : "file://",
HOST_VOLUME_MOUNT_PATH, NAMESPACE);
+ options.set("warehouse", warehouse);
+ try {
+ Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(options));
+ return catalog.getTable(Identifier.create(dbName, tbName));
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException("table not exist");
+ }
+ }
+
+ private long getTableRowCount(FileStoreTable table) {
+ try {
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ TableRead tableRead = readBuilder.newRead();
+
+ long count = 0;
+ try (RecordReader<InternalRow> reader =
tableRead.createReader(plan);
+ RecordReaderIterator<InternalRow> iterator =
+ new RecordReaderIterator<>(reader)) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ count++;
+ }
+ }
+ return count;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to read data count from table",
e);
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_branch.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_branch.conf
new file mode 100644
index 0000000000..ddff49d357
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_branch.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ auto.increment.enabled = true
+ auto.increment.start = 1
+ row.num = 10001
+ schema = {
+ fields {
+ pk_id = bigint
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_time = time
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ plugin_output = "fake"
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "/tmp/seatunnel_mnt/paimon"
+ database = "default"
+ table = "st_test_p"
+ branch = "test_branch"
+ }
+}