This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 1d2cfd789 [FLINK-35985][transform] Correct the substring function in transform rule 1d2cfd789 is described below commit 1d2cfd789e582c7e864b2b5b4e00c160b2a2386a Author: MOBIN <18814118...@163.com> AuthorDate: Thu Nov 7 18:03:01 2024 +0800 [FLINK-35985][transform] Correct the substring function in transform rule This closes #3537. --- docs/content.zh/docs/core-concept/transform.md | 3 +- docs/content/docs/core-concept/transform.md | 19 ++++---- .../cdc/runtime/functions/SystemFunctionUtils.java | 57 +++++++++++++++++++++- .../parser/metadata/TransformSqlOperatorTable.java | 1 + .../transform/PostTransformOperatorTest.java | 19 +++++++- 5 files changed, 86 insertions(+), 13 deletions(-) diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index 382a7c341..da2a6a4ee 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -126,7 +126,8 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | LOWER(string) | lower(string) | Returns string in lowercase. | | TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | | REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". | -| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | +| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | +| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | | CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | ## Temporal Functions diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 382a7c341..e8d58f254 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -118,16 +118,17 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ ## String Functions -| Function | Janino Code | Description | -| -------------------- | ------------------------ | ------------------------------------------------- | -| string1 || string2 | concat(string1, string2) | Returns the concatenation of STRING1 and STRING2. | -| CHAR_LENGTH(string) | charLength(string) | Returns the number of characters in STRING. | -| UPPER(string) | upper(string) | Returns string in uppercase. | -| LOWER(string) | lower(string) | Returns string in lowercase. | -| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | +| Function | Janino Code | Description | +| -------------------- | ------------------------ |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| string1 || string2 | concat(string1, string2) | Returns the concatenation of STRING1 and STRING2. | +| CHAR_LENGTH(string) | charLength(string) | Returns the number of characters in STRING. | +| UPPER(string) | upper(string) | Returns string in uppercase. | +| LOWER(string) | lower(string) | Returns string in lowercase. | +| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | | REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". | -| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | -| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | +| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | +| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | +| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | ## Temporal Functions diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index ba569fc08..649b9b8cc 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -361,11 +361,64 @@ public class SystemFunctionUtils { } public static String substr(String str, int beginIndex) { - return str.substring(beginIndex); + return substring(str, beginIndex); } public static String substr(String str, int beginIndex, int length) { - return str.substring(beginIndex, beginIndex + length); + return substring(str, beginIndex, length); + } + + public static String substring(String str, int beginIndex) { + return substring(str, beginIndex, Integer.MAX_VALUE); + } + + public static String substring(String str, int beginIndex, int length) { + if (length < 0) { + LOG.error( + "length of 'substring(str, beginIndex, length)' must be >= 0 and Int type, but length = {}", + length); + throw new RuntimeException( + "length of 'substring(str, beginIndex, length)' must be >= 0 and Int type, but length = " + + length); + } + if (length > Integer.MAX_VALUE || beginIndex > Integer.MAX_VALUE) { + LOG.error( + "length or start of 'substring(str, beginIndex, length)' must be Int type, but length = {}, beginIndex = {}", + beginIndex, + length); + throw new RuntimeException( + "length or start of 'substring(str, beginIndex, length)' must be Int type, but length = " + + beginIndex + + ", beginIndex = " + + length); + } + if (str.isEmpty()) { + return ""; + } + + int startPos; + int endPos; + + if (beginIndex > 0) { + startPos = beginIndex - 1; + if (startPos >= str.length()) { + return ""; + } + } else if (beginIndex < 0) { + startPos = str.length() + beginIndex; + if (startPos < 0) { + return ""; + } + } else { + startPos = 0; + } + + if ((str.length() - startPos) < length) { + endPos = str.length(); + } else { + endPos = startPos + length; + } + return str.substring(startPos, endPos); } public static String upper(String str) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index 658550da0..986337c8a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -191,6 +191,7 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable { SqlTypeFamily.INTEGER, SqlTypeFamily.INTEGER)), SqlFunctionCategory.STRING); + public static final SqlFunction SUBSTRING = SqlStdOperatorTable.SUBSTRING; // ------------------ // Temporal Functions diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 067842c31..9e0e9843c 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -1499,7 +1499,24 @@ public class PostTransformOperatorTest { testExpressionConditionTransform("concat('123', 'abc') = '123abc'"); testExpressionConditionTransform("upper('abc') = 'ABC'"); testExpressionConditionTransform("lower('ABC') = 'abc'"); - testExpressionConditionTransform("SUBSTR('ABC', 1, 1) = 'B'"); + testExpressionConditionTransform("SUBSTR('ABC', -1) = 'C'"); + testExpressionConditionTransform("SUBSTR('ABC', -2, 2) = 'BC'"); + testExpressionConditionTransform("SUBSTR('ABC', 0) = 'ABC'"); + testExpressionConditionTransform("SUBSTR('ABC', 1) = 'ABC'"); + testExpressionConditionTransform("SUBSTR('ABC', 2, 2) = 'BC'"); + testExpressionConditionTransform("SUBSTR('ABC', 2, 100) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC', -1) = 'C'"); + testExpressionConditionTransform("SUBSTRING('ABC', -2, 2) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC', 0) = 'ABC'"); + testExpressionConditionTransform("SUBSTRING('ABC', 1) = 'ABC'"); + testExpressionConditionTransform("SUBSTRING('ABC', 2, 2) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC', 2, 100) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM -1) = 'C'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM -2 FOR 2) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM 0) = 'ABC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM 1) = 'ABC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM 2 FOR 2) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM 2 FOR 100) = 'BC'"); testExpressionConditionTransform("'ABC' like '^[a-zA-Z]'"); testExpressionConditionTransform("'123' not like '^[a-zA-Z]'"); testExpressionConditionTransform("abs(2) = 2");