This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a46b7f463 [FLINK-38839][runtime] Support semicolon delimiter for
table-options (#4199)
a46b7f463 is described below
commit a46b7f463a2a8150692c9e355065d7aa61db58ad
Author: Lanny Boarts <[email protected]>
AuthorDate: Sat Mar 21 00:35:19 2026 +0800
[FLINK-38839][runtime] Support semicolon delimiter for table-options (#4199)
Co-authored-by: yuxiqian <[email protected]>
Co-authored-by: Leonard Xu <[email protected]>
---
docs/content.zh/docs/core-concept/transform.md | 33 ++--
docs/content/docs/core-concept/transform.md | 10 +-
.../cli/parser/YamlPipelineDefinitionParser.java | 8 +
.../cdc/composer/definition/TransformDef.java | 38 ++++-
.../flink/translator/TransformTranslator.java | 2 +
.../src/test/resources/rules/unexpected.yaml | 2 +-
.../transform/PostTransformOperatorBuilder.java | 24 +++
.../operators/transform/PreTransformOperator.java | 7 +-
.../transform/PreTransformOperatorBuilder.java | 24 +++
.../transform/SchemaMetadataTransform.java | 33 ++--
.../runtime/operators/transform/TransformRule.java | 7 +
.../transform/SchemaMetadataTransformTest.java | 171 +++++++++++++++++++++
12 files changed, 331 insertions(+), 28 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md
b/docs/content.zh/docs/core-concept/transform.md
index 99b87ae73..b01274ad0 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -31,16 +31,17 @@ under the License.
# 参数
为了定义一个 transform 规则,可以使用以下参数:
-| 参数 | 含义
| 是否必填 |
-|---------------------------|--------------------------------------------------------|----------|
-| source-table | 源表 ID,支持正则表达式
| 必填 |
-| projection | 投影规则,支持类似 SQL 中 SELECT 子句的语法
| 可选 |
-| filter | 过滤规则,支持类似 SQL 中 WHERE 子句的语法
| 可选 |
-| primary-keys | 目标表主键,以逗号分隔
| 可选 |
-| partition-keys | 目标表分区键,以逗号分隔
| 可选 |
-| table-options | 用于配置自动建表时的建表语句
| 可选 |
-| converter-after-transform | 用于在 transform 处理后添加转换器来修改 DataChangeEvent
| 可选 |
-| description | Transform 规则描述
| 可选 |
+| 参数 | 含义 | 是否必填
|
+|---------------------------|-------------------------------------------|------|
+| source-table | 源表 ID,支持正则表达式 | 必填
|
+| projection | 投影规则,支持类似 SQL 中 SELECT 子句的语法 | 可选
|
+| filter | 过滤规则,支持类似 SQL 中 WHERE 子句的语法 | 可选
|
+| primary-keys | 目标表主键,以逗号分隔 | 可选
|
+| partition-keys | 目标表分区键,以逗号分隔 | 可选
|
+| table-options | 用于配置自动建表时的建表语句 | 可选
|
+| table-options.delimiter | 多个表属性的分隔符, 默认值为 `,` | 可选 |
+| converter-after-transform | 用于在 transform 处理后添加转换器来修改 DataChangeEvent | 可选
|
+| description | Transform 规则描述 | 可选
|
多个 transform 规则可以声明在一个 pipeline YAML 文件中。
@@ -335,7 +336,13 @@ transform:
table-options: comment=web order
description: auto creating table options example
```
-小技巧:table-options 的格式是 `key1=value1,key2=value2`。
+小技巧:table-options 的格式是 `key1=value1,key2=value2`;如果 value 中包含逗号或其他特殊字符,可以使用
`table-options.delimiter` 指定自定义分隔符(如 `;`、`|`、`$` 等):
+```yaml
+transform:
+ - source-table: mydb.web_order
+ table-options:
sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
+ table-options.delimiter: ";"
+```
## 分类映射
在一张表同时被多个转换规则命中时,
@@ -467,8 +474,8 @@ transform:
## Embedding AI 模型
-Embedding AI 模型可以在 transform 规则中使用。
-为了使用 Embedding AI 模型,你需要下载内置模型的 jar,然后在 `flink-cdc.sh` 命令中添加 `--jar
{$BUILT_IN_MODEL_PATH}`。
+内置 AI 模型可以在 transform 规则中使用。
+为了使用内置 AI 模型,你需要下载内置模型的 jar ,然后在 `flink-cdc.sh` 命令中添加 `--jar
{$BUILT_IN_MODEL_PATH}`。
如何定义一个 Embedding AI 模型:
diff --git a/docs/content/docs/core-concept/transform.md
b/docs/content/docs/core-concept/transform.md
index 1617bc8df..5df6dbcf9 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -39,6 +39,7 @@ To describe a transform rule, the following parameters can be
used:
| primary-keys | Sink table primary keys, separated by commas
| optional |
| partition-keys | Sink table partition keys, separated by commas
| optional |
| table-options | used to the configure table creation statement
when automatically creating tables | optional |
+| table-options.delimiter | delimiter for table-options key-value pairs,
default is `,` | optional |
| converter-after-transform | used to add a converter to change
DataChangeEvent after transform | optional |
| description | Transform rule description
| optional |
@@ -339,6 +340,13 @@ transform:
description: auto creating table options example
```
Tips: The format of table-options is `key1=value1,key2=value2`.
+If option values contain commas or other special characters, you can specify a
custom delimiter using `table-options.delimiter` (such as `;`, `|`, `$`, etc.):
+```yaml
+transform:
+ - source-table: mydb.web_order
+ table-options:
sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
+ table-options.delimiter: ";"
+```
## Classification mapping
If a table hits ultiple transform rules, only the first matched transform rule
will apply.
@@ -538,4 +546,4 @@ The following built-in models are provided:
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| openai.model | STRING | required | Name of model to be called, for
example: "text-embedding-3-small", Available options are
"text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
| openai.host | STRING | required | Host of the Model server to be
connected, for example: `http://langchain4j.dev/demo/openai/v1`.
|
-| openai.apikey | STRING | required | Api Key for verification of the
Model server, for example, "demo".
|
\ No newline at end of file
+| openai.apikey | STRING | required | Api Key for verification of the
Model server, for example, "demo".
|
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index 79f51a705..831bb5e65 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -106,6 +106,8 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
public static final String TRANSFORM_TABLE_OPTION_KEY = "table-options";
+ public static final String TRANSFORM_TABLE_OPTION_DELIMITER_KEY =
"table-options.delimiter";
+
private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
/** Parse the specified pipeline definition file. */
@@ -347,6 +349,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
TRANSFORM_PRIMARY_KEY_KEY,
TRANSFORM_PARTITION_KEY_KEY,
TRANSFORM_TABLE_OPTION_KEY,
+ TRANSFORM_TABLE_OPTION_DELIMITER_KEY,
TRANSFORM_DESCRIPTION_KEY,
TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY));
@@ -380,6 +383,10 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_KEY))
.map(JsonNode::asText)
.orElse(null);
+ String tableOptionsDelimiter =
+
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_DELIMITER_KEY))
+ .map(JsonNode::asText)
+ .orElse(",");
String description =
Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY))
.map(JsonNode::asText)
@@ -396,6 +403,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
primaryKeys,
partitionKeys,
tableOptions,
+ tableOptionsDelimiter,
description,
postTransformConverter);
}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
index 43cac6095..0b00292d4 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
@@ -30,12 +30,14 @@ import java.util.Objects;
* definition.
* <li>filter: a string for filtering the row of matched table as output.
Optional for the
* definition.
- * <li>primaryKeys: a string for primary key columns for matching input
table IDs, seperated by
+ * <li>primaryKeys: a string for primary key columns for matching input
table IDs, separated by
* `,`. Optional for the definition.
- * <li>partitionKeys: a string for partition key columns for matching input
table IDs, seperated
+ * <li>partitionKeys: a string for partition key columns for matching input
table IDs, separated
* by `,`. Optional for the definition.
* <li>tableOptions: a string for table options for matching input table
IDs, options are
- * seperated by `,`, key and value are seperated by `=`. Optional for
the definition.
+ * separated by `,`, key and value are separated by `=`. Optional for
the definition.
+ * <li>tableOptionsDelimiter: a string for delimiter of table options,
default is `,`. Optional
+ * for the definition.
* <li>description: description for the transformation. Optional for the
definition.
* </ul>
*/
@@ -47,6 +49,7 @@ public class TransformDef {
private final String primaryKeys;
private final String partitionKeys;
private final String tableOptions;
+ private final String tableOptionsDelimiter;
private final String postTransformConverter;
public TransformDef(
@@ -56,6 +59,7 @@ public class TransformDef {
String primaryKeys,
String partitionKeys,
String tableOptions,
+ String tableOptionsDelimiter,
String description,
String postTransformConverter) {
this.sourceTable = sourceTable;
@@ -64,10 +68,32 @@ public class TransformDef {
this.primaryKeys = primaryKeys;
this.partitionKeys = partitionKeys;
this.tableOptions = tableOptions;
+ this.tableOptionsDelimiter = tableOptionsDelimiter;
this.description = description;
this.postTransformConverter = postTransformConverter;
}
+ public TransformDef(
+ String sourceTable,
+ String projection,
+ String filter,
+ String primaryKeys,
+ String partitionKeys,
+ String tableOptions,
+ String description,
+ String postTransformConverter) {
+ this(
+ sourceTable,
+ projection,
+ filter,
+ primaryKeys,
+ partitionKeys,
+ tableOptions,
+ ",",
+ description,
+ postTransformConverter);
+ }
+
public String getSourceTable() {
return sourceTable;
}
@@ -96,6 +122,10 @@ public class TransformDef {
return tableOptions;
}
+ public String getTableOptionsDelimiter() {
+ return tableOptionsDelimiter;
+ }
+
public String getPostTransformConverter() {
return postTransformConverter;
}
@@ -137,6 +167,7 @@ public class TransformDef {
&& Objects.equals(primaryKeys, that.primaryKeys)
&& Objects.equals(partitionKeys, that.partitionKeys)
&& Objects.equals(tableOptions, that.tableOptions)
+ && Objects.equals(tableOptionsDelimiter,
that.tableOptionsDelimiter)
&& Objects.equals(postTransformConverter,
that.postTransformConverter);
}
@@ -150,6 +181,7 @@ public class TransformDef {
primaryKeys,
partitionKeys,
tableOptions,
+ tableOptionsDelimiter,
postTransformConverter);
}
}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
index a938d7ab0..6e296db4d 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
@@ -74,6 +74,7 @@ public class TransformTranslator {
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
+ transform.getTableOptionsDelimiter(),
transform.getPostTransformConverter(),
supportedMetadataColumns);
}
@@ -111,6 +112,7 @@ public class TransformTranslator {
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
+ transform.getTableOptionsDelimiter(),
transform.getPostTransformConverter(),
supportedMetadataColumns);
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
index ff7339a4c..0b9257b1a 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/rules/unexpected.yaml
@@ -48,7 +48,7 @@ steps:
sink: uuid
error: |
Unexpected key `mapping` in YAML transform block.
- Allowed keys in this context are: [source-table, projection, filter,
primary-keys, partition-keys, table-options, description,
converter-after-transform]
+ Allowed keys in this context are: [source-table, projection, filter,
primary-keys, partition-keys, table-options, table-options.delimiter,
description, converter-after-transform]
Note: option mapping: [{"source":"userUuid","sink":"uuid"}] is
unexpected. It was silently ignored in previous versions, and probably should
be removed.
# Unexpected route block keys
- type: submit
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java
index f28bc6ce5..380d9343a 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorBuilder.java
@@ -44,6 +44,28 @@ public class PostTransformOperatorBuilder {
String tableOptions,
String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
+ return addTransform(
+ tableInclusions,
+ projection,
+ filter,
+ primaryKey,
+ partitionKey,
+ tableOptions,
+ ",",
+ postTransformConverter,
+ supportedMetadataColumns);
+ }
+
+ public PostTransformOperatorBuilder addTransform(
+ String tableInclusions,
+ @Nullable String projection,
+ @Nullable String filter,
+ String primaryKey,
+ String partitionKey,
+ String tableOptions,
+ String tableOptionsDelimiter,
+ String postTransformConverter,
+ SupportedMetadataColumn[] supportedMetadataColumns) {
transformRules.add(
new TransformRule(
tableInclusions,
@@ -52,6 +74,7 @@ public class PostTransformOperatorBuilder {
primaryKey,
partitionKey,
tableOptions,
+ tableOptionsDelimiter,
postTransformConverter,
supportedMetadataColumns));
return this;
@@ -67,6 +90,7 @@ public class PostTransformOperatorBuilder {
"",
"",
"",
+ ",",
null,
new SupportedMetadataColumn[0]));
return this;
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 03909e3a7..cb8748d12 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -110,6 +110,7 @@ public class PreTransformOperator extends
AbstractStreamOperatorAdapter<Event>
String primaryKeys = transformRule.getPrimaryKey();
String partitionKeys = transformRule.getPartitionKey();
String tableOptions = transformRule.getTableOption();
+ String tableOptionsDelimiter =
transformRule.getTableOptionsDelimiter();
Selectors selectors =
new
Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
transforms.add(
@@ -120,7 +121,11 @@ public class PreTransformOperator extends
AbstractStreamOperatorAdapter<Event>
schemaMetadataTransformers.add(
new Tuple2<>(
selectors,
- new SchemaMetadataTransform(primaryKeys,
partitionKeys, tableOptions)));
+ new SchemaMetadataTransform(
+ primaryKeys,
+ partitionKeys,
+ tableOptions,
+ tableOptionsDelimiter)));
}
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.hasAsteriskMap = new ConcurrentHashMap<>();
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
index bb4df65a5..285259921 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java
@@ -43,6 +43,7 @@ public class PreTransformOperatorBuilder {
"",
"",
"",
+ ",",
null,
new SupportedMetadataColumn[0]));
return this;
@@ -57,6 +58,28 @@ public class PreTransformOperatorBuilder {
String tableOption,
@Nullable String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
+ return addTransform(
+ tableInclusions,
+ projection,
+ filter,
+ primaryKey,
+ partitionKey,
+ tableOption,
+ ",",
+ postTransformConverter,
+ supportedMetadataColumns);
+ }
+
+ public PreTransformOperatorBuilder addTransform(
+ String tableInclusions,
+ @Nullable String projection,
+ @Nullable String filter,
+ String primaryKey,
+ String partitionKey,
+ String tableOption,
+ String tableOptionsDelimiter,
+ @Nullable String postTransformConverter,
+ SupportedMetadataColumn[] supportedMetadataColumns) {
transformRules.add(
new TransformRule(
tableInclusions,
@@ -65,6 +88,7 @@ public class PreTransformOperatorBuilder {
primaryKey,
partitionKey,
tableOption,
+ tableOptionsDelimiter,
postTransformConverter,
supportedMetadataColumns));
return this;
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java
index 3b7a09856..742322d78 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransform.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
/**
* a Pojo class to describe the information of the
primaryKeys/partitionKeys/options transformation
@@ -42,7 +43,10 @@ public class SchemaMetadataTransform implements Serializable
{
private Map<String, String> options = new HashMap<>();
public SchemaMetadataTransform(
- String primaryKeyString, String partitionKeyString, String
tableOptionString) {
+ String primaryKeyString,
+ String partitionKeyString,
+ String tableOptionString,
+ String tableOptionsDelimiter) {
if (!StringUtils.isNullOrWhitespaceOnly(primaryKeyString)) {
String[] primaryKeyArr = primaryKeyString.split(",");
for (int i = 0; i < primaryKeyArr.length; i++) {
@@ -58,15 +62,26 @@ public class SchemaMetadataTransform implements
Serializable {
partitionKeys = Arrays.asList(partitionKeyArr);
}
if (!StringUtils.isNullOrWhitespaceOnly(tableOptionString)) {
- for (String tableOption : tableOptionString.split(",")) {
- String[] kv = tableOption.split("=");
- if (kv.length != 2) {
- throw new IllegalArgumentException(
- "table option format error: "
- + tableOptionString
- + ", it should be like
`key1=value1,key2=value2`.");
+ // Use custom delimiter if provided, otherwise default to comma
for backward
+ // compatibility.
+ // Note: We only check for null or empty string here, not
whitespace-only,
+ // because whitespace characters like '\n', '\t' can be valid
delimiters.
+ String delimiter =
+ (tableOptionsDelimiter == null ||
tableOptionsDelimiter.isEmpty())
+ ? ","
+ : tableOptionsDelimiter;
+ String[] tableOptions =
tableOptionString.split(Pattern.quote(delimiter));
+ for (String tableOption : tableOptions) {
+ {
+ String[] kv = tableOption.split("=", 2);
+ if (kv.length != 2) {
+ throw new IllegalArgumentException(
+ String.format(
+ "table option format error: %s, it
should be like `key1=value1%skey2=value2`.",
+ tableOptionString, delimiter));
+ }
+ options.put(kv[0].trim(), kv[1].trim());
}
- options.put(kv[0].trim(), kv[1].trim());
}
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
index 5c444fe42..824d5e0cf 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
@@ -34,6 +34,7 @@ public class TransformRule implements Serializable {
private final String primaryKey;
private final String partitionKey;
private final String tableOption;
+ private final String tableOptionsDelimiter;
private final @Nullable String postTransformConverter;
private final SupportedMetadataColumn[] supportedMetadataColumns;
@@ -44,6 +45,7 @@ public class TransformRule implements Serializable {
String primaryKey,
String partitionKey,
String tableOption,
+ String tableOptionsDelimiter,
@Nullable String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
this.tableInclusions = tableInclusions;
@@ -52,6 +54,7 @@ public class TransformRule implements Serializable {
this.primaryKey = primaryKey;
this.partitionKey = partitionKey;
this.tableOption = tableOption;
+ this.tableOptionsDelimiter = tableOptionsDelimiter;
this.postTransformConverter = postTransformConverter;
this.supportedMetadataColumns = supportedMetadataColumns;
}
@@ -82,6 +85,10 @@ public class TransformRule implements Serializable {
return tableOption;
}
+ public String getTableOptionsDelimiter() {
+ return tableOptionsDelimiter;
+ }
+
@Nullable
public String getPostTransformConverter() {
return postTransformConverter;
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransformTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransformTest.java
new file mode 100644
index 000000000..d12f669fe
--- /dev/null
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/SchemaMetadataTransformTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Testcases for {@link SchemaMetadataTransform}. */
+class SchemaMetadataTransformTest {
+
+ @Test
+ void testTableOptionsWithCommaDelimiter() {
+ SchemaMetadataTransform transform =
+ new SchemaMetadataTransform(null, null,
"key1=value1,key2=value2", null);
+ assertThat(transform.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+ }
+
+ @Test
+ void testTableOptionsWithSemicolonDelimiter() {
+ SchemaMetadataTransform transform =
+ new SchemaMetadataTransform(null, null,
"key1=value1;key2=value2", ";");
+ assertThat(transform.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+ }
+
+ @Test
+ void testTableOptionsWithCommaInValue() {
+ SchemaMetadataTransform transform =
+ new SchemaMetadataTransform(
+ null,
+ null,
+ "sequence.field=gxsj,jjsj;"
+ + "file-index.range-bitmap.columns=jjsj;"
+ + "file-index.bloom-filter.columns=jjdbh",
+ ";");
+ assertThat(transform.getOptions())
+ .containsEntry("sequence.field", "gxsj,jjsj")
+ .containsEntry("file-index.range-bitmap.columns", "jjsj")
+ .containsEntry("file-index.bloom-filter.columns", "jjdbh");
+ }
+
+ @Test
+ void testTableOptionsSplitByFirstEqualSign() {
+ SchemaMetadataTransform transform =
+ new SchemaMetadataTransform(null, null,
"key1=value=1;key2=value2", ";");
+ assertThat(transform.getOptions())
+ .containsEntry("key1", "value=1")
+ .containsEntry("key2", "value2");
+ }
+
+ @Test
+ void testTableOptionsWithCustomDelimiter() {
+ SchemaMetadataTransform transform =
+ new SchemaMetadataTransform(null, null,
"key1=value1|key2=value2", "|");
+ assertThat(transform.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+ }
+
+ @Test
+ void testTableOptionsWithCustomDelimiterAndCommaInValue() {
+ SchemaMetadataTransform transform =
+ new SchemaMetadataTransform(
+ null,
+ null,
+
"sequence.field=gxsj,jjsj$file-index.range-bitmap.columns=jjsj",
+ "$");
+ assertThat(transform.getOptions())
+ .containsEntry("sequence.field", "gxsj,jjsj")
+ .containsEntry("file-index.range-bitmap.columns", "jjsj");
+ }
+
+ @Test
+ void testTableOptionsWithRegexSpecialCharacterDelimiter() {
+ // Test with regex special characters like '.', '*', '+', '?', '[',
']', '(', ')', etc.
+ // These characters have special meaning in regex and should be
treated as literal
+ // delimiters.
+
+ // Test with '.' (dot) as delimiter
+ SchemaMetadataTransform transformDot =
+ new SchemaMetadataTransform(null, null,
"key1=value1.key2=value2", ".");
+ assertThat(transformDot.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+
+ // Test with '*' (asterisk) as delimiter
+ SchemaMetadataTransform transformAsterisk =
+ new SchemaMetadataTransform(null, null,
"key1=value1*key2=value2", "*");
+ assertThat(transformAsterisk.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+
+ // Test with '+' (plus) as delimiter
+ SchemaMetadataTransform transformPlus =
+ new SchemaMetadataTransform(null, null,
"key1=value1+key2=value2", "+");
+ assertThat(transformPlus.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+
+ // Test with '?' (question mark) as delimiter
+ SchemaMetadataTransform transformQuestion =
+ new SchemaMetadataTransform(null, null,
"key1=value1?key2=value2", "?");
+ assertThat(transformQuestion.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+
+ // Test with '[' (bracket) as delimiter
+ SchemaMetadataTransform transformBracket =
+ new SchemaMetadataTransform(null, null,
"key1=value1[key2=value2", "[");
+ assertThat(transformBracket.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+
+ // Test with '\\' (backslash) as delimiter
+ SchemaMetadataTransform transformBackslash =
+ new SchemaMetadataTransform(null, null,
"key1=value1\\key2=value2", "\\");
+ assertThat(transformBackslash.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+ }
+
+ @Test
+ void testTableOptionsWithSpecialCharacterDelimiter() {
+ // Test with newline as delimiter
+ SchemaMetadataTransform transformNewline =
+ new SchemaMetadataTransform(null, null,
"key1=value1\nkey2=value2", "\n");
+ assertThat(transformNewline.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+
+ // Test with tab as delimiter
+ SchemaMetadataTransform transformTab =
+ new SchemaMetadataTransform(null, null,
"key1=value1\tkey2=value2", "\t");
+ assertThat(transformTab.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+
+ // Test with carriage return as delimiter
+ SchemaMetadataTransform transformCR =
+ new SchemaMetadataTransform(null, null,
"key1=value1\rkey2=value2", "\r");
+ assertThat(transformCR.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+
+ // Test with CRLF (Windows line ending) as delimiter
+ SchemaMetadataTransform transformCRLF =
+ new SchemaMetadataTransform(null, null,
"key1=value1\r\nkey2=value2", "\r\n");
+ assertThat(transformCRLF.getOptions())
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2");
+ }
+}