This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 959a23b47a [Feature][Connector-V2] Support the dynamic options of the 
paimon table read (#9981)
959a23b47a is described below

commit 959a23b47a0787dfd46fe2c9887a68f7aeafde50
Author: xiaochen <[email protected]>
AuthorDate: Tue Nov 11 11:55:18 2025 +0800

    [Feature][Connector-V2] Support the dynamic options of the paimon table 
read (#9981)
---
 docs/en/connector-v2/source/Paimon.md              |   6 +-
 docs/zh/connector-v2/source/Paimon.md              |   8 +-
 .../seatunnel/paimon/source/PaimonSource.java      |  20 +-
 .../paimon/source/PaimonSourceReader.java          |  10 +-
 .../converter/SqlToPaimonPredicateConverter.java   |  24 ++
 .../paimon/source/PaimonDynamicOptionsTest.java    |  87 ++++++++
 .../source/converter/SqlToPaimonConverterTest.java |  26 +++
 .../connector/paimon/PaimonDynamicOptionsIT.java   | 248 +++++++++++++++++++++
 ...n_to_assert_with_dynamic_options_of_branch.conf |  49 ++++
 ...to_assert_with_dynamic_options_of_incr_tag.conf |  60 +++++
 ...mon_to_assert_with_dynamic_options_of_tag1.conf |  49 ++++
 ...mon_to_assert_with_dynamic_options_of_tag2.conf |  49 ++++
 12 files changed, 621 insertions(+), 15 deletions(-)

diff --git a/docs/en/connector-v2/source/Paimon.md 
b/docs/en/connector-v2/source/Paimon.md
index 12c4f83b36..b0cafbc94d 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -95,7 +95,11 @@ The filter condition of the table read. For example: `select 
* from st_test wher
 Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, 
is not null, between...and, in, not in, like, and others are not supported.
 The Having, Group By, Order By clauses are currently unsupported, because 
these clauses are not supported by Paimon.
 you can also project specific columns, for example: select id, name from 
st_test where id > 100.
-The limit will be supported in the future.
+
+Supports dynamic options settings:
+```sql
+SELECT * FROM table /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') 
*/;
+```
 
 Note: When the field after the where condition is a string or boolean value, 
its value must be enclosed in single quotes, otherwise an error will be 
reported. `For example: name='abc' or tag='true'`
 The field data types currently supported by where conditions are as follows:
diff --git a/docs/zh/connector-v2/source/Paimon.md 
b/docs/zh/connector-v2/source/Paimon.md
index 343afa1031..ad484c7f21 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -97,7 +97,13 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
 
 Projection 已支持,你可以选择特定的列,例如:select id, name from st_test where id > 100。
 
-由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`,未来版本将会支持  `limit`。
+由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`。
+
+query 参数支持动态参数设置:
+```sql
+SELECT * FROM table /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') 
*/;
+```
+
 
 注意:当 `where` 后的字段为字符串或布尔值时,其值必须使用单引号,否则将会报错。例如 `name='abc'` 或 `tag='true'`。
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
index 2d970403ce..501a1c46b9 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
@@ -39,10 +39,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.Paimon
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
 
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.RowType;
 
+import lombok.extern.slf4j.Slf4j;
 import net.sf.jsqlparser.statement.select.PlainSelect;
 
 import java.util.LinkedList;
@@ -54,6 +55,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.
 import static 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;
 
 /** Paimon connector source class. */
+@Slf4j
 public class PaimonSource
         implements SeaTunnelSource<SeaTunnelRow, PaimonSourceSplit, 
PaimonSourceState> {
 
@@ -64,7 +66,7 @@ public class PaimonSource
     private JobContext jobContext;
 
     private List<CatalogTable> catalogTables = Lists.newArrayList();
-    private Map<String, Table> paimonTables = Maps.newHashMap();
+    private Map<String, FileStoreTable> paimonTables = Maps.newHashMap();
     private Map<String, SeaTunnelRowType> seaTunnelRowTypes = 
Maps.newHashMap();
     private Map<String, ReadBuilder> readBuilders = Maps.newHashMap();
 
@@ -75,12 +77,18 @@ public class PaimonSource
                         tableConfig -> {
                             TablePath tablePath = tableConfig.getTablePath();
                             CatalogTable catalogTable = 
paimonCatalog.getTable(tablePath);
-                            Table paimonTable = 
paimonCatalog.getPaimonTable(tablePath);
+                            FileStoreTable paimonTable =
+                                    (FileStoreTable) 
paimonCatalog.getPaimonTable(tablePath);
+                            String query = tableConfig.getQuery();
+                            Map<String, String> dynamicOptions =
+                                    
SqlToPaimonPredicateConverter.parseDynamicOptions(query);
+                            if (!dynamicOptions.isEmpty()) {
+                                paimonTable = paimonTable.copy(dynamicOptions);
+                            }
                             RowType paimonRowType = paimonTable.rowType();
                             String[] filedNames =
                                     paimonRowType.getFieldNames().toArray(new 
String[0]);
-
-                            PlainSelect plainSelect = 
convertToPlainSelect(tableConfig.getQuery());
+                            PlainSelect plainSelect = 
convertToPlainSelect(query);
                             Predicate predicate = null;
                             int[] projectionIndex = null;
                             if (!Objects.isNull(plainSelect)) {
@@ -102,13 +110,11 @@ public class PaimonSource
                             this.seaTunnelRowTypes.put(
                                     tableKey,
                                     RowTypeConverter.convert(paimonRowType, 
projectionIndex));
-
                             ReadBuilder readBuilder =
                                     paimonTable
                                             .newReadBuilder()
                                             .withProjection(projectionIndex)
                                             .withFilter(predicate);
-
                             this.paimonTables.put(tableKey, paimonTable);
                             this.readBuilders.put(tableKey, readBuilder);
                         });
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
index ac24e64d04..e8247cd0de 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
@@ -30,7 +30,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableRead;
 
@@ -51,14 +50,14 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
 
     private final Deque<PaimonSourceSplit> sourceSplits = new 
ConcurrentLinkedDeque<>();
     private final SourceReader.Context context;
-    private final Map<String, Table> tables;
+    private final Map<String, FileStoreTable> tables;
     private final Map<String, SeaTunnelRowType> seaTunnelRowTypes;
     private final Map<String, TableRead> tableReads;
     private volatile boolean noMoreSplit;
 
     public PaimonSourceReader(
             Context context,
-            Map<String, Table> tables,
+            Map<String, FileStoreTable> tables,
             Map<String, SeaTunnelRowType> seaTunnelRowTypes,
             Map<String, ReadBuilder> readBuilders) {
         this.context = context;
@@ -86,7 +85,7 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
             final PaimonSourceSplit split = sourceSplits.poll();
             if (Objects.nonNull(split)) {
                 String tableId = split.getTableId();
-                Table table = tables.get(tableId);
+                FileStoreTable table = tables.get(tableId);
                 SeaTunnelRowType seaTunnelRowType = 
seaTunnelRowTypes.get(tableId);
                 TableRead tableRead = tableReads.get(tableId);
                 try (final RecordReader<InternalRow> reader =
@@ -96,8 +95,7 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
                     while (rowIterator.hasNext()) {
                         final InternalRow row = rowIterator.next();
                         final SeaTunnelRow seaTunnelRow =
-                                RowConverter.convert(
-                                        row, seaTunnelRowType, 
((FileStoreTable) table).schema());
+                                RowConverter.convert(row, seaTunnelRowType, 
table.schema());
                         if 
(Boundedness.UNBOUNDED.equals(context.getBoundedness())) {
                             RowKind rowKind =
                                     
RowKindConverter.convertPaimonRowKind2SeatunnelRowkind(
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
index 1a7ddd0a22..62070284af 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -67,7 +67,9 @@ import net.sf.jsqlparser.statement.select.SelectItem;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.regex.Matcher;
@@ -149,6 +151,28 @@ public class SqlToPaimonPredicateConverter {
         return parseExpressionToPredicate(builder, rowType, whereExpression);
     }
 
+    public static Map<String, String> parseDynamicOptions(String sql) {
+        Map<String, String> dynamicOptions = new HashMap<>();
+        if (StringUtils.isBlank(sql)) {
+            return dynamicOptions;
+        }
+        String dynamicOptionsPattern = "/\\*\\+ OPTIONS\\((.*?)\\) \\*/";
+        Pattern optionsPattern = Pattern.compile(dynamicOptionsPattern, 
Pattern.CASE_INSENSITIVE);
+        Matcher optionsMatcher = optionsPattern.matcher(sql);
+        if (optionsMatcher.find()) {
+            String optionsContent = optionsMatcher.group(1).trim();
+
+            Pattern kvPattern = 
Pattern.compile("'\\s*(.*?)\\s*'\\s*=\\s*'\\s*(.*?)\\s*'");
+            Matcher kvMatcher = kvPattern.matcher(optionsContent);
+            while (kvMatcher.find()) {
+                String key = kvMatcher.group(1).trim();
+                String value = kvMatcher.group(2).trim();
+                dynamicOptions.put(key, value);
+            }
+        }
+        return dynamicOptions;
+    }
+
     private static Predicate parseExpressionToPredicate(
             PredicateBuilder builder, RowType rowType, Expression expression) {
         if (expression instanceof IsNullExpression) {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java
new file mode 100644
index 0000000000..23f70121e8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source;
+
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PaimonDynamicOptionsTest {
+
+    @Test
+    public void testParseDynamicOptionsWithIncrementalTimestamp() {
+        String query =
+                "SELECT * FROM table /*+ 
OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 
00:08:00') */ WHERE int_col > 3";
+        Map<String, String> dynamicOptions =
+                SqlToPaimonPredicateConverter.parseDynamicOptions(query);
+        assertEquals(1, dynamicOptions.size());
+        
assertTrue(dynamicOptions.containsKey("incremental-between-timestamp"));
+        assertEquals(
+                "2025-03-12 00:00:00,2025-03-12 00:08:00",
+                dynamicOptions.get("incremental-between-timestamp"));
+    }
+
+    @Test
+    public void testParseDynamicOptionsWithScanTag() {
+        String query =
+                "SELECT * FROM table /*+ OPTIONS('scan.tag-name' = 'my-tag') 
*/ WHERE int_col > 3";
+        Map<String, String> dynamicOptions =
+                SqlToPaimonPredicateConverter.parseDynamicOptions(query);
+        assertEquals(1, dynamicOptions.size());
+        assertTrue(dynamicOptions.containsKey("scan.tag-name"));
+        assertEquals("my-tag", dynamicOptions.get("scan.tag-name"));
+    }
+
+    @Test
+    public void testParseDynamicOptionsWithMultipleOptions() {
+        String query =
+                "SELECT * FROM table /*+ 
OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 
00:08:00', 'scan.tag-name' = 'my-tag', 'scan.snapshot-id' = '123') */ WHERE 
int_col > 3";
+        Map<String, String> dynamicOptions =
+                SqlToPaimonPredicateConverter.parseDynamicOptions(query);
+        assertEquals(3, dynamicOptions.size());
+        
assertTrue(dynamicOptions.containsKey("incremental-between-timestamp"));
+        assertTrue(dynamicOptions.containsKey("scan.tag-name"));
+        assertTrue(dynamicOptions.containsKey("scan.snapshot-id"));
+        assertEquals(
+                "2025-03-12 00:00:00,2025-03-12 00:08:00",
+                dynamicOptions.get("incremental-between-timestamp"));
+        assertEquals("my-tag", dynamicOptions.get("scan.tag-name"));
+        assertEquals("123", dynamicOptions.get("scan.snapshot-id"));
+    }
+
+    @Test
+    public void testParseDynamicOptionsWithNoOptions() {
+        String query = "SELECT * FROM table WHERE int_col > 3";
+        Map<String, String> dynamicOptions =
+                SqlToPaimonPredicateConverter.parseDynamicOptions(query);
+        assertTrue(dynamicOptions.isEmpty());
+    }
+
+    @Test
+    public void testParseDynamicOptionsWithEmptyOptions() {
+        String query = "SELECT * FROM table /*+ OPTIONS() */ WHERE int_col > 
3";
+        Map<String, String> dynamicOptions =
+                SqlToPaimonPredicateConverter.parseDynamicOptions(query);
+        assertTrue(dynamicOptions.isEmpty());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
index 812d1da98e..9c0bfcd6a6 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
@@ -49,12 +49,14 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Arrays;
+import java.util.Map;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class SqlToPaimonConverterTest {
 
@@ -292,4 +294,28 @@ public class SqlToPaimonConverterTest {
 
         assertEquals(expectedPredicate.toString(), predicate.toString());
     }
+
+    @Test
+    public void testParseDynamicOptions() {
+        String query =
+                "SELECT * FROM table /*+ 
OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 
00:08:00') */ WHERE int_col > 3 OR double_col < 6.6 ";
+        Map<String, String> dynamicOptions =
+                SqlToPaimonPredicateConverter.parseDynamicOptions(query);
+        assertEquals(1, dynamicOptions.size());
+        
assertTrue(dynamicOptions.containsKey("incremental-between-timestamp"));
+        assertEquals(
+                "2025-03-12 00:00:00,2025-03-12 00:08:00",
+                dynamicOptions.get("incremental-between-timestamp"));
+
+        query =
+                "SELECT * FROM table /*+ 
OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 
00:08:00', 'scan.tag-name' = 'my-tag') */ WHERE int_col > 3 OR double_col < 6.6 
";
+        dynamicOptions = 
SqlToPaimonPredicateConverter.parseDynamicOptions(query);
+        assertEquals(2, dynamicOptions.size());
+        
assertTrue(dynamicOptions.containsKey("incremental-between-timestamp"));
+        assertTrue(dynamicOptions.containsKey("scan.tag-name"));
+        assertEquals(
+                "2025-03-12 00:00:00,2025-03-12 00:08:00",
+                dynamicOptions.get("incremental-between-timestamp"));
+        assertEquals("my-tag", dynamicOptions.get("scan.tag-name"));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java
new file mode 100644
index 0000000000..b3cbb0e42a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryArray;
+import org.apache.paimon.data.BinaryArrayWriter;
+import org.apache.paimon.data.BinaryMap;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.DateTimeUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import java.math.BigDecimal;
+import java.nio.file.Path;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@DisabledOnContainer(
+        value = {TestContainerId.FLINK_1_13, TestContainerId.SPARK_2_4},
+        disabledReason =
+                "Paimon does not support flink 1.13, Spark 2.4.6 has a jar 
package(zstd-jni-version.jar) version compatibility issue.")
+public class PaimonDynamicOptionsIT extends TestSuiteBase implements 
TestResource {
+
+    private final String DATABASE_NAME = "default";
+    private final String TABLE_NAME = "st_test_p";
+
+    private static final String NAMESPACE = "paimon";
+    protected static String hostName = System.getProperty("user.name");
+    protected static final String CONTAINER_VOLUME_MOUNT_PATH = 
"/tmp/seatunnel_mnt";
+    protected static final boolean isWindows =
+            
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+    public static final String HOST_VOLUME_MOUNT_PATH =
+            isWindows
+                    ? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName)
+                    : CONTAINER_VOLUME_MOUNT_PATH;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Path schemaPath = 
ContainerUtil.getResourcesFile("/schema-0.json").toPath();
+                container.copyFileToContainer(
+                        MountableFile.forHostPath(schemaPath),
+                        
"/tmp/seatunnel_mnt/paimon/default.db/st_test/schema/schema-0");
+                container.copyFileToContainer(
+                        MountableFile.forHostPath(schemaPath),
+                        
"/tmp/seatunnel_mnt/paimon/default.db/st_test_p/schema/schema-0");
+                container.copyFileToContainer(
+                        MountableFile.forHostPath(schemaPath),
+                        
"/tmp/seatunnel_mnt/paimon/default.db/st_test_p1/schema/schema-0");
+                container.execInContainer("chmod", "777", "-R", 
"/tmp/seatunnel_mnt/");
+            };
+
+    @Override
+    public void startUp() throws Exception {}
+
+    @Override
+    @AfterEach
+    public void tearDown() throws Exception {}
+
+    @TestTemplate
+    public void testPaimonDynamicOptionsOfBranch(TestContainer container) 
throws Exception {
+        String testBranchName = "test-branch";
+        FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, 
TABLE_NAME);
+        List<String> branches = table.branchManager().branches();
+        if (!branches.contains(testBranchName)) {
+            table.createBranch(testBranchName);
+        }
+        FileStoreTable fileStoreTableWithBranch = 
table.switchToBranch(testBranchName);
+        TableWriteImpl<?> write = fileStoreTableWithBranch.newWrite("3494269");
+
+        write.write(createTestRow(1L, "First record"));
+        write.write(createTestRow(2L, "Second record"));
+        write.write(createTestRow(3L, "Third record"));
+        write.write(createTestRow(4L, "Fourth record"));
+        write.write(createTestRow(5L, "Fifth record"));
+
+        List<CommitMessage> commitMessages = write.prepareCommit(false, 1);
+        try (TableCommitImpl commit = 
fileStoreTableWithBranch.newCommit("3494269")) {
+            commit.commit(commitMessages);
+        }
+        write.close();
+
+        Container.ExecResult textWriteResult =
+                
container.executeJob("/paimon_to_assert_with_dynamic_options_of_branch.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testPaimonDynamicOptionsOfTag(TestContainer container) throws 
Exception {
+        String testTag1 = "test-tag1";
+        String testTag2 = "test-tag2";
+        FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, 
TABLE_NAME);
+
+        TableWriteImpl<?> write = table.newWrite("3494269");
+
+        write.write(createTestRow(1L, "First record"));
+        write.write(createTestRow(2L, "Second record"));
+        write.write(createTestRow(3L, "Third record"));
+        write.write(createTestRow(4L, "Fourth record"));
+        write.write(createTestRow(5L, "Fifth record"));
+
+        List<CommitMessage> commitMessages = write.prepareCommit(false, 1);
+        try (TableCommitImpl commit = table.newCommit("3494269")) {
+            commit.commit(commitMessages);
+        }
+        table.createTag(testTag1);
+
+        Container.ExecResult textWriteTag1 =
+                
container.executeJob("/paimon_to_assert_with_dynamic_options_of_tag1.conf");
+        Assertions.assertEquals(0, textWriteTag1.getExitCode());
+
+        write.write(createTestRow(6L, "Sixth record"));
+        write.write(createTestRow(7L, "Seventh record"));
+        commitMessages = write.prepareCommit(false, 1);
+        try (TableCommitImpl commit = table.newCommit("3494269")) {
+            commit.commit(commitMessages);
+        }
+        table.createTag(testTag2);
+        write.close();
+
+        Container.ExecResult textWriteTag2 =
+                
container.executeJob("/paimon_to_assert_with_dynamic_options_of_tag2.conf");
+        Assertions.assertEquals(0, textWriteTag2.getExitCode());
+
+        Container.ExecResult textWriteResult =
+                
container.executeJob("/paimon_to_assert_with_dynamic_options_of_incr_tag.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+    }
+
+    private Table getTable(String dbName, String tbName) {
+        Options options = new Options();
+        String warehouse =
+                String.format(
+                        "%s%s/%s", isWindows ? "" : "file://", 
HOST_VOLUME_MOUNT_PATH, NAMESPACE);
+        options.set("warehouse", warehouse);
+        try {
+            Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(options));
+            return catalog.getTable(Identifier.create(dbName, tbName));
+        } catch (Catalog.TableNotExistException e) {
+            throw new RuntimeException("table not exist");
+        }
+    }
+
+    private GenericRow createTestRow(Long pkId, String description) {
+        Map<String, String> mapData = new HashMap<>();
+        mapData.put("key1", "value1_" + pkId);
+        mapData.put("key2", "value2_" + pkId);
+        mapData.put("description", description);
+        BinaryArray keyArray = new BinaryArray();
+        BinaryArrayWriter keyWriter =
+                new BinaryArrayWriter(
+                        keyArray, 3, 
BinaryArray.calculateFixLengthPartSize(DataTypes.STRING()));
+        keyWriter.writeString(0, BinaryString.fromString("key1"));
+        keyWriter.writeString(1, BinaryString.fromString("key2"));
+        keyWriter.writeString(2, BinaryString.fromString("description"));
+        keyWriter.complete();
+
+        BinaryArray valueArray = new BinaryArray();
+        BinaryArrayWriter valueWriter =
+                new BinaryArrayWriter(
+                        valueArray, 3, 
BinaryArray.calculateFixLengthPartSize(DataTypes.STRING()));
+        valueWriter.writeString(0, BinaryString.fromString("value1_" + pkId));
+        valueWriter.writeString(1, BinaryString.fromString("value2_" + pkId));
+        valueWriter.writeString(2, BinaryString.fromString(description));
+        valueWriter.complete();
+
+        BinaryMap binaryMap = BinaryMap.valueOf(keyArray, valueArray);
+        BinaryArray intArray = new BinaryArray();
+        BinaryArrayWriter intArrayWriter =
+                new BinaryArrayWriter(
+                        intArray, 3, 
BinaryArray.calculateFixLengthPartSize(DataTypes.INT()));
+        intArrayWriter.writeInt(0, pkId.intValue());
+        intArrayWriter.writeInt(1, pkId.intValue() * 10);
+        intArrayWriter.writeInt(2, pkId.intValue() * 100);
+        intArrayWriter.complete();
+        return GenericRow.of(
+                pkId,
+                binaryMap,
+                intArray,
+                BinaryString.fromString(description + "_" + pkId),
+                pkId % 2 == 0,
+                (byte) (pkId % 128),
+                (short) (pkId * 10),
+                pkId.intValue() * 100,
+                pkId * 1000L,
+                pkId.floatValue() + 0.5f,
+                pkId.doubleValue() + 0.123,
+                Decimal.fromBigDecimal(new BigDecimal(pkId + ".12345678"), 30, 
8),
+                BinaryString.fromString("bytes_" + pkId).toBytes(),
+                DateTimeUtils.toInternal(LocalDate.of(2024, 1, pkId.intValue() 
% 28 + 1)),
+                Timestamp.fromLocalDateTime(
+                        LocalDateTime.of(
+                                2024,
+                                1,
+                                pkId.intValue() % 28 + 1,
+                                pkId.intValue() % 24,
+                                pkId.intValue() % 60,
+                                0)),
+                DateTimeUtils.toInternal(
+                        LocalTime.of(pkId.intValue() % 24, pkId.intValue() % 
60, 0)));
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf
new file mode 100644
index 0000000000..1d6c8696af
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    database = "default"
+    table = "st_test_p"
+    query = "SELECT * FROM st_test_p /*+ OPTIONS('branch' = 'test-branch') */"
+    plugin_output = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 5
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf
new file mode 100644
index 0000000000..59eac4591e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    database = "default"
+    table = "st_test_p"
+    query = "SELECT * FROM st_test_p /*+ OPTIONS('incremental-between' = 
'test-tag1,test-tag2') */"
+    plugin_output = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 2
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 2
+        }
+      ]
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf
new file mode 100644
index 0000000000..7875565e2f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    database = "default"
+    table = "st_test_p"
+    query = "SELECT * FROM st_test_p  /*+ OPTIONS('scan.tag-name'='test-tag1') 
*/ "
+    plugin_output = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 5
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf
new file mode 100644
index 0000000000..6d0a13dfb8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    database = "default"
+    table = "st_test_p"
+    query = "SELECT * FROM st_test_p  /*+ OPTIONS('scan.tag-name'='test-tag2') 
*/ "
+    plugin_output = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 7
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 7
+        }
+      ]
+    }
+  }
+}


Reply via email to