This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a46b7f463 [FLINK-38839][runtime] Support semicolon delimiter for 
table-options (#4199)
a46b7f463 is described below

commit a46b7f463a2a8150692c9e355065d7aa61db58ad
Author: Lanny Boarts <[email protected]>
AuthorDate: Sat Mar 21 00:35:19 2026 +0800

    [FLINK-38839][runtime] Support semicolon delimiter for table-options (#4199)
    
    Co-authored-by: yuxiqian <[email protected]>
    Co-authored-by: Leonard Xu <[email protected]>
---
 docs/content.zh/docs/core-concept/transform.md     |  33 ++--
 docs/content/docs/core-concept/transform.md        |  10 +-
 .../cli/parser/YamlPipelineDefinitionParser.java   |   8 +
 .../cdc/composer/definition/TransformDef.java      |  38 ++++-
 .../flink/translator/TransformTranslator.java      |   2 +
 .../src/test/resources/rules/unexpected.yaml       |   2 +-
 .../transform/PostTransformOperatorBuilder.java    |  24 +++
 .../operators/transform/PreTransformOperator.java  |   7 +-
 .../transform/PreTransformOperatorBuilder.java     |  24 +++
 .../transform/SchemaMetadataTransform.java         |  33 ++--
 .../runtime/operators/transform/TransformRule.java |   7 +
 .../transform/SchemaMetadataTransformTest.java     | 171 +++++++++++++++++++++
 12 files changed, 331 insertions(+), 28 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index 99b87ae73..b01274ad0 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -31,16 +31,17 @@ under the License.
 # 参数
 为了定义一个 transform 规则,可以使用以下参数:
 
-| 参数                        | 含义                                               
      | 是否必填     |
-|---------------------------|--------------------------------------------------------|----------|
-| source-table              | 源表 ID,支持正则表达式                                    
     | 必填       |
-| projection                | 投影规则,支持类似 SQL 中 SELECT 子句的语法                     
      | 可选       |
-| filter                    | 过滤规则,支持类似 SQL 中 WHERE 子句的语法                      
      | 可选       |
-| primary-keys              | 目标表主键,以逗号分隔                                      
      | 可选       |
-| partition-keys            | 目标表分区键,以逗号分隔                                     
      | 可选       |
-| table-options             | 用于配置自动建表时的建表语句                                   
      | 可选       |
-| converter-after-transform | 用于在 transform 处理后添加转换器来修改 DataChangeEvent        
        | 可选       |
-| description               | Transform 规则描述                                   
      | 可选       |
+| 参数                        | 含义                                        | 是否必填 
|
+|---------------------------|-------------------------------------------|------|
+| source-table              | 源表 ID,支持正则表达式                             | 必填   
|
+| projection                | 投影规则,支持类似 SQL 中 SELECT 子句的语法              | 可选   
|
+| filter                    | 过滤规则,支持类似 SQL 中 WHERE 子句的语法               | 可选   
|
+| primary-keys              | 目标表主键,以逗号分隔                               | 可选   
|
+| partition-keys            | 目标表分区键,以逗号分隔                              | 可选   
|
+| table-options             | 用于配置自动建表时的建表语句                            | 可选   
|
+| table-options.delimiter   | 多个表属性的分隔符, 默认值为 `,`       | 可选   |
+| converter-after-transform | 用于在 transform 处理后添加转换器来修改 DataChangeEvent | 可选   
|
+| description               | Transform 规则描述                            | 可选   
|
 
 多个 transform 规则可以声明在一个 pipeline YAML 文件中。
 
@@ -335,7 +336,13 @@ transform:
     table-options: comment=web order
     description: auto creating table options example
 ```
-小技巧:table-options 的格式是 `key1=value1,key2=value2`。
+小技巧:table-options 的格式是 `key1=value1,key2=value2`;如果 value 中包含逗号或其他特殊字符,可以使用 
`table-options.delimiter` 指定自定义分隔符(如 `;`、`|`、`$` 等):
+```yaml
+transform:
+  - source-table: mydb.web_order
+    table-options: 
sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
+    table-options.delimiter: ";"
+```
 
 ## 分类映射
 在一张表同时被多个转换规则命中时,
@@ -467,8 +474,8 @@ transform:
 
 ## Embedding AI 模型
 
-Embedding AI 模型可以在 transform 规则中使用。
-为了使用 Embedding AI 模型,你需要下载内置模型的 jar,然后在 `flink-cdc.sh` 命令中添加 `--jar 
{$BUILT_IN_MODEL_PATH}`。
+内置 AI 模型可以在 transform 规则中使用。
+为了使用内置 AI 模型,你需要下载内置模型的 jar ,然后在 `flink-cdc.sh` 命令中添加 `--jar 
{$BUILT_IN_MODEL_PATH}`。
 
 如何定义一个 Embedding AI 模型:
 
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index 1617bc8df..5df6dbcf9 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -39,6 +39,7 @@ To describe a transform rule, the following parameters can be 
used:
 | primary-keys              | Sink table primary keys, separated by commas     
                                 | optional          |
 | partition-keys            | Sink table partition keys, separated by commas   
                                 | optional          |
 | table-options             | used to the configure table creation statement 
when automatically creating tables | optional          |
+| table-options.delimiter   | delimiter for table-options key-value pairs, 
default is `,`                       | optional          |
 | converter-after-transform | used to add a converter to change 
DataChangeEvent after transform                 | optional          |
 | description               | Transform rule description                       
                                 | optional          |
 
@@ -339,6 +340,13 @@ transform:
     description: auto creating table options example
 ```
 Tips: The format of table-options is `key1=value1,key2=value2`.
+If option values contain commas or other special characters, you can specify a 
custom delimiter using `table-options.delimiter` (such as `;`, `|`, `$`, etc.):
+```yaml
+transform:
+  - source-table: mydb.web_order
+    table-options: 
sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
+    table-options.delimiter: ";"
+```
 
 ## Classification mapping
 If a table hits ultiple transform rules, only the first matched transform rule 
will apply.
@@ -538,4 +546,4 @@ The following built-in models are provided:
 
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | openai.model  | STRING | required          | Name of model to be called, for 
example: "text-embedding-3-small", Available options are 
"text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
 | openai.host   | STRING | required          | Host of the Model server to be 
connected, for example: `http://langchain4j.dev/demo/openai/v1`.                
                                                        |
-| openai.apikey | STRING | required          | Api Key for verification of the 
Model server, for example, "demo".                                              
                                                       |
\ No newline at end of file
+| openai.apikey | STRING | required          | Api Key for verification of the 
Model server, for example, "demo".                                              
                                                       |
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index 79f51a705..831bb5e65 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -106,6 +106,8 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
 
     public static final String TRANSFORM_TABLE_OPTION_KEY = "table-options";
 
+    public static final String TRANSFORM_TABLE_OPTION_DELIMITER_KEY = 
"table-options.delimiter";
+
     private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
 
     /** Parse the specified pipeline definition file. */
@@ -347,6 +349,7 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
                         TRANSFORM_PRIMARY_KEY_KEY,
                         TRANSFORM_PARTITION_KEY_KEY,
                         TRANSFORM_TABLE_OPTION_KEY,
+                        TRANSFORM_TABLE_OPTION_DELIMITER_KEY,
                         TRANSFORM_DESCRIPTION_KEY,
                         TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY));
 
@@ -380,6 +383,10 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
                 
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_KEY))
                         .map(JsonNode::asText)
                         .orElse(null);
+        String tableOptionsDelimiter =
+                
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_DELIMITER_KEY))
+                        .map(JsonNode::asText)
+                        .orElse(",");
         String description =
                 
Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY))
                         .map(JsonNode::asText)
@@ -396,6 +403,7 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
                 primaryKeys,
                 partitionKeys,
                 tableOptions,
+                tableOptionsDelimiter,
                 description,
                 postTransformConverter);
     }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
index 43cac6095..0b00292d4 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
@@ -30,12 +30,14 @@ import java.util.Objects;
  *       definition.
  *   <li>filter: a string for filtering the row of matched table as output. 
Optional for the
  *       definition.
- *   <li>primaryKeys: a string for primary key columns for matching input 
table IDs, seperated by
+ *   <li>primaryKeys: a string for primary key columns for matching input 
table IDs, separated by
  *       `,`. Optional for the definition.
- *   <li>partitionKeys: a string for partition key columns for matching input 
table IDs, seperated
+ *   <li>partitionKeys: a string for partition key columns for matching input 
table IDs, separated
  *       by `,`. Optional for the definition.
  *   <li>tableOptions: a string for table options for matching input table 
IDs, options are
- *       seperated by `,`, key and value are seperated by `=`. Optional for 
the definition.
+ *       separated by `,`, key and value are separated by `=`. Optional for 
the definition.
+ *   <li>tableOptionsDelimiter: a string for delimiter of table options, 
default is `,`. Optional
+ *       for the definition.
  *   <li>description: description for the transformation. Optional for the 
definition.
  * </ul>
  */
@@ -47,6 +49,7 @@ public class TransformDef {
     private final String primaryKeys;
     private final String partitionKeys;
     private final String tableOptions;
+    private final String tableOptionsDelimiter;
     private final String postTransformConverter;
 
     public TransformDef(
@@ -56,6 +59,7 @@ public class TransformDef {
             String primaryKeys,
             String partitionKeys,
             String tableOptions,
+            String tableOptionsDelimiter,
             String description,
             String postTransformConverter) {
         this.sourceTable = sourceTable;
@@ -64,10 +68,32 @@ public class TransformDef {
         this.primaryKeys = primaryKeys;
         this.partitionKeys = partitionKeys;
         this.tableOptions = tableOptions;
+        this.tableOptionsDelimiter = tableOptionsDelimiter;
         this.description = description;
         this.postTransformConverter = postTransformConverter;
     }
 
+    public TransformDef(
+            String sourceTable,
+            String projection,
+            String filter,
+            String primaryKeys,
+            String partitionKeys,
+            String tableOptions,
+            String description,
+            String postTransformConverter) {
+        this(
+                sourceTable,
+                projection,
+                filter,
+                primaryKeys,
+                partitionKeys,
+                tableOptions,
+                ",",
+                description,
+                postTransformConverter);
+    }
+
     public String getSourceTable() {
         return sourceTable;
     }
@@ -96,6 +122,10 @@ public class TransformDef {
         return tableOptions;
     }
 
+    public String getTableOptionsDelimiter() {
+        return tableOptionsDelimiter;
+    }
+
     public String getPostTransformConverter() {
         return postTransformConverter;
     }
@@ -137,6 +167,7 @@ public class TransformDef {
                 && Objects.equals(primaryKeys, that.primaryKeys)
                 && Objects.equals(partitionKeys, that.partitionKeys)
                 && Objects.equals(tableOptions, that.tableOptions)
+                && Objects.equals(tableOptionsDelimiter, 
that.tableOptionsDelimiter)
                 && Objects.equals(postTransformConverter, 
that.postTransformConverter);
     }
 
@@ -150,6 +181,7 @@ public class TransformDef {
                 primaryKeys,
                 partitionKeys,
                 tableOptions,
+                tableOptionsDelimiter,
                 postTransformConverter);
     }
 }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
index a938d7ab0..6e296db4d 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
@@ -74,6 +74,7 @@ public class TransformTranslator {
                     transform.getPrimaryKeys(),
                     transform.getPartitionKeys(),
                     transform.getTableOptions(),
+                    transform.getTableOptionsDelimiter(),
                     transform.getPostTransformConverter(),
                     supportedMetadataColumns);
         }
@@ -111,6 +112,7 @@ public class TransformTranslator {
                     transform.getPrimaryKeys(),
                     transform.getPartitionKeys(),
                     transform.getTableOptions(),
+                    transform.getTableOptionsDelimiter(),
                     transform.getPostTransformConverter(),
                     supportedMetadataColumns);
         }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
index ff7339a4c..0b9257b1a 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
@@ -48,7 +48,7 @@ steps:
               sink: uuid
     error: |
       Unexpected key `mapping` in YAML transform block.
-      Allowed keys in this context are: [source-table, projection, filter, 
primary-keys, partition-keys, table-options, description, 
converter-after-transform]
+      Allowed keys in this context are: [source-table, projection, filter, 
primary-keys, partition-keys, table-options, table-options.delimiter, 
description, converter-after-transform]
       Note: option mapping: [{"source":"userUuid","sink":"uuid"}] is 
unexpected. It was silently ignored in previous versions, and probably should 
be removed.
   # Unexpected route block keys
   - type: submit
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java
index f28bc6ce5..380d9343a 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java
@@ -44,6 +44,28 @@ public class PostTransformOperatorBuilder {
             String tableOptions,
             String postTransformConverter,
             SupportedMetadataColumn[] supportedMetadataColumns) {
+        return addTransform(
+                tableInclusions,
+                projection,
+                filter,
+                primaryKey,
+                partitionKey,
+                tableOptions,
+                ",",
+                postTransformConverter,
+                supportedMetadataColumns);
+    }
+
+    public PostTransformOperatorBuilder addTransform(
+            String tableInclusions,
+            @Nullable String projection,
+            @Nullable String filter,
+            String primaryKey,
+            String partitionKey,
+            String tableOptions,
+            String tableOptionsDelimiter,
+            String postTransformConverter,
+            SupportedMetadataColumn[] supportedMetadataColumns) {
         transformRules.add(
                 new TransformRule(
                         tableInclusions,
@@ -52,6 +74,7 @@ public class PostTransformOperatorBuilder {
                         primaryKey,
                         partitionKey,
                         tableOptions,
+                        tableOptionsDelimiter,
                         postTransformConverter,
                         supportedMetadataColumns));
         return this;
@@ -67,6 +90,7 @@ public class PostTransformOperatorBuilder {
                         "",
                         "",
                         "",
+                        ",",
                         null,
                         new SupportedMetadataColumn[0]));
         return this;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 03909e3a7..cb8748d12 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -110,6 +110,7 @@ public class PreTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
             String primaryKeys = transformRule.getPrimaryKey();
             String partitionKeys = transformRule.getPartitionKey();
             String tableOptions = transformRule.getTableOption();
+            String tableOptionsDelimiter = 
transformRule.getTableOptionsDelimiter();
             Selectors selectors =
                     new 
Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
             transforms.add(
@@ -120,7 +121,11 @@ public class PreTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
             schemaMetadataTransformers.add(
                     new Tuple2<>(
                             selectors,
-                            new SchemaMetadataTransform(primaryKeys, 
partitionKeys, tableOptions)));
+                            new SchemaMetadataTransform(
+                                    primaryKeys,
+                                    partitionKeys,
+                                    tableOptions,
+                                    tableOptionsDelimiter)));
         }
         this.preTransformProcessorMap = new ConcurrentHashMap<>();
         this.hasAsteriskMap = new ConcurrentHashMap<>();
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
index bb4df65a5..285259921 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
@@ -43,6 +43,7 @@ public class PreTransformOperatorBuilder {
                         "",
                         "",
                         "",
+                        ",",
                         null,
                         new SupportedMetadataColumn[0]));
         return this;
@@ -57,6 +58,28 @@ public class PreTransformOperatorBuilder {
             String tableOption,
             @Nullable String postTransformConverter,
             SupportedMetadataColumn[] supportedMetadataColumns) {
+        return addTransform(
+                tableInclusions,
+                projection,
+                filter,
+                primaryKey,
+                partitionKey,
+                tableOption,
+                ",",
+                postTransformConverter,
+                supportedMetadataColumns);
+    }
+
+    public PreTransformOperatorBuilder addTransform(
+            String tableInclusions,
+            @Nullable String projection,
+            @Nullable String filter,
+            String primaryKey,
+            String partitionKey,
+            String tableOption,
+            String tableOptionsDelimiter,
+            @Nullable String postTransformConverter,
+            SupportedMetadataColumn[] supportedMetadataColumns) {
         transformRules.add(
                 new TransformRule(
                         tableInclusions,
@@ -65,6 +88,7 @@ public class PreTransformOperatorBuilder {
                         primaryKey,
                         partitionKey,
                         tableOption,
+                        tableOptionsDelimiter,
                         postTransformConverter,
                         supportedMetadataColumns));
         return this;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java
index 3b7a09856..742322d78 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 /**
  * a Pojo class to describe the information of the 
primaryKeys/partitionKeys/options transformation
@@ -42,7 +43,10 @@ public class SchemaMetadataTransform implements Serializable 
{
     private Map<String, String> options = new HashMap<>();
 
     public SchemaMetadataTransform(
-            String primaryKeyString, String partitionKeyString, String 
tableOptionString) {
+            String primaryKeyString,
+            String partitionKeyString,
+            String tableOptionString,
+            String tableOptionsDelimiter) {
         if (!StringUtils.isNullOrWhitespaceOnly(primaryKeyString)) {
             String[] primaryKeyArr = primaryKeyString.split(",");
             for (int i = 0; i < primaryKeyArr.length; i++) {
@@ -58,15 +62,26 @@ public class SchemaMetadataTransform implements 
Serializable {
             partitionKeys = Arrays.asList(partitionKeyArr);
         }
         if (!StringUtils.isNullOrWhitespaceOnly(tableOptionString)) {
-            for (String tableOption : tableOptionString.split(",")) {
-                String[] kv = tableOption.split("=");
-                if (kv.length != 2) {
-                    throw new IllegalArgumentException(
-                            "table option format error: "
-                                    + tableOptionString
-                                    + ", it should be like 
`key1=value1,key2=value2`.");
+            // Use custom delimiter if provided, otherwise default to comma 
for backward
+            // compatibility.
+            // Note: We only check for null or empty string here, not 
whitespace-only,
+            // because whitespace characters like '\n', '\t' can be valid 
delimiters.
+            String delimiter =
+                    (tableOptionsDelimiter == null || 
tableOptionsDelimiter.isEmpty())
+                            ? ","
+                            : tableOptionsDelimiter;
+            String[] tableOptions = 
tableOptionString.split(Pattern.quote(delimiter));
+            for (String tableOption : tableOptions) {
+                {
+                    String[] kv = tableOption.split("=", 2);
+                    if (kv.length != 2) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "table option format error: %s, it 
should be like `key1=value1%skey2=value2`.",
+                                        tableOptionString, delimiter));
+                    }
+                    options.put(kv[0].trim(), kv[1].trim());
                 }
-                options.put(kv[0].trim(), kv[1].trim());
             }
         }
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
index 5c444fe42..824d5e0cf 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
@@ -34,6 +34,7 @@ public class TransformRule implements Serializable {
     private final String primaryKey;
     private final String partitionKey;
     private final String tableOption;
+    private final String tableOptionsDelimiter;
     private final @Nullable String postTransformConverter;
     private final SupportedMetadataColumn[] supportedMetadataColumns;
 
@@ -44,6 +45,7 @@ public class TransformRule implements Serializable {
             String primaryKey,
             String partitionKey,
             String tableOption,
+            String tableOptionsDelimiter,
             @Nullable String postTransformConverter,
             SupportedMetadataColumn[] supportedMetadataColumns) {
         this.tableInclusions = tableInclusions;
@@ -52,6 +54,7 @@ public class TransformRule implements Serializable {
         this.primaryKey = primaryKey;
         this.partitionKey = partitionKey;
         this.tableOption = tableOption;
+        this.tableOptionsDelimiter = tableOptionsDelimiter;
         this.postTransformConverter = postTransformConverter;
         this.supportedMetadataColumns = supportedMetadataColumns;
     }
@@ -82,6 +85,10 @@ public class TransformRule implements Serializable {
         return tableOption;
     }
 
+    public String getTableOptionsDelimiter() {
+        return tableOptionsDelimiter;
+    }
+
     @Nullable
     public String getPostTransformConverter() {
         return postTransformConverter;
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransformTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransformTest.java
new file mode 100644
index 000000000..d12f669fe
--- /dev/null
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransformTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Testcases for {@link SchemaMetadataTransform}. */
+class SchemaMetadataTransformTest {
+
+    @Test
+    void testTableOptionsWithCommaDelimiter() {
+        SchemaMetadataTransform transform =
+                new SchemaMetadataTransform(null, null, 
"key1=value1,key2=value2", null);
+        assertThat(transform.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+    }
+
+    @Test
+    void testTableOptionsWithSemicolonDelimiter() {
+        SchemaMetadataTransform transform =
+                new SchemaMetadataTransform(null, null, 
"key1=value1;key2=value2", ";");
+        assertThat(transform.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+    }
+
+    @Test
+    void testTableOptionsWithCommaInValue() {
+        SchemaMetadataTransform transform =
+                new SchemaMetadataTransform(
+                        null,
+                        null,
+                        "sequence.field=gxsj,jjsj;"
+                                + "file-index.range-bitmap.columns=jjsj;"
+                                + "file-index.bloom-filter.columns=jjdbh",
+                        ";");
+        assertThat(transform.getOptions())
+                .containsEntry("sequence.field", "gxsj,jjsj")
+                .containsEntry("file-index.range-bitmap.columns", "jjsj")
+                .containsEntry("file-index.bloom-filter.columns", "jjdbh");
+    }
+
+    @Test
+    void testTableOptionsSplitByFirstEqualSign() {
+        SchemaMetadataTransform transform =
+                new SchemaMetadataTransform(null, null, 
"key1=value=1;key2=value2", ";");
+        assertThat(transform.getOptions())
+                .containsEntry("key1", "value=1")
+                .containsEntry("key2", "value2");
+    }
+
+    @Test
+    void testTableOptionsWithCustomDelimiter() {
+        SchemaMetadataTransform transform =
+                new SchemaMetadataTransform(null, null, 
"key1=value1|key2=value2", "|");
+        assertThat(transform.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+    }
+
+    @Test
+    void testTableOptionsWithCustomDelimiterAndCommaInValue() {
+        SchemaMetadataTransform transform =
+                new SchemaMetadataTransform(
+                        null,
+                        null,
+                        
"sequence.field=gxsj,jjsj$file-index.range-bitmap.columns=jjsj",
+                        "$");
+        assertThat(transform.getOptions())
+                .containsEntry("sequence.field", "gxsj,jjsj")
+                .containsEntry("file-index.range-bitmap.columns", "jjsj");
+    }
+
+    @Test
+    void testTableOptionsWithRegexSpecialCharacterDelimiter() {
+        // Test with regex special characters like '.', '*', '+', '?', '[', 
']', '(', ')', etc.
+        // These characters have special meaning in regex and should be 
treated as literal
+        // delimiters.
+
+        // Test with '.' (dot) as delimiter
+        SchemaMetadataTransform transformDot =
+                new SchemaMetadataTransform(null, null, 
"key1=value1.key2=value2", ".");
+        assertThat(transformDot.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+
+        // Test with '*' (asterisk) as delimiter
+        SchemaMetadataTransform transformAsterisk =
+                new SchemaMetadataTransform(null, null, 
"key1=value1*key2=value2", "*");
+        assertThat(transformAsterisk.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+
+        // Test with '+' (plus) as delimiter
+        SchemaMetadataTransform transformPlus =
+                new SchemaMetadataTransform(null, null, 
"key1=value1+key2=value2", "+");
+        assertThat(transformPlus.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+
+        // Test with '?' (question mark) as delimiter
+        SchemaMetadataTransform transformQuestion =
+                new SchemaMetadataTransform(null, null, 
"key1=value1?key2=value2", "?");
+        assertThat(transformQuestion.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+
+        // Test with '[' (bracket) as delimiter
+        SchemaMetadataTransform transformBracket =
+                new SchemaMetadataTransform(null, null, 
"key1=value1[key2=value2", "[");
+        assertThat(transformBracket.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+
+        // Test with '\\' (backslash) as delimiter
+        SchemaMetadataTransform transformBackslash =
+                new SchemaMetadataTransform(null, null, 
"key1=value1\\key2=value2", "\\");
+        assertThat(transformBackslash.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+    }
+
+    @Test
+    void testTableOptionsWithSpecialCharacterDelimiter() {
+        // Test with newline as delimiter
+        SchemaMetadataTransform transformNewline =
+                new SchemaMetadataTransform(null, null, 
"key1=value1\nkey2=value2", "\n");
+        assertThat(transformNewline.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+
+        // Test with tab as delimiter
+        SchemaMetadataTransform transformTab =
+                new SchemaMetadataTransform(null, null, 
"key1=value1\tkey2=value2", "\t");
+        assertThat(transformTab.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+
+        // Test with carriage return as delimiter
+        SchemaMetadataTransform transformCR =
+                new SchemaMetadataTransform(null, null, 
"key1=value1\rkey2=value2", "\r");
+        assertThat(transformCR.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+
+        // Test with CRLF (Windows line ending) as delimiter
+        SchemaMetadataTransform transformCRLF =
+                new SchemaMetadataTransform(null, null, 
"key1=value1\r\nkey2=value2", "\r\n");
+        assertThat(transformCRLF.getOptions())
+                .containsEntry("key1", "value1")
+                .containsEntry("key2", "value2");
+    }
+}


Reply via email to