This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d2cfd789 [FLINK-35985][transform] Correct the substring function in 
transform rule
1d2cfd789 is described below

commit 1d2cfd789e582c7e864b2b5b4e00c160b2a2386a
Author: MOBIN <18814118...@163.com>
AuthorDate: Thu Nov 7 18:03:01 2024 +0800

    [FLINK-35985][transform] Correct the substring function in transform rule
    
    This closes #3537.
---
 docs/content.zh/docs/core-concept/transform.md     |  3 +-
 docs/content/docs/core-concept/transform.md        | 19 ++++----
 .../cdc/runtime/functions/SystemFunctionUtils.java | 57 +++++++++++++++++++++-
 .../parser/metadata/TransformSqlOperatorTable.java |  1 +
 .../transform/PostTransformOperatorTest.java       | 19 +++++++-
 5 files changed, 86 insertions(+), 13 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index 382a7c341..da2a6a4ee 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -126,7 +126,8 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to 
parse expressions and [
 | LOWER(string) | lower(string) | Returns string in lowercase. |
 | TRIM(string1) | trim('BOTH',string1) | Returns a string that removes 
whitespaces at both sides. |
 | REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, 
string3) | Returns a string from STRING1 with all the substrings that match a 
regular expression STRING2 consecutively being replaced with STRING3. E.g., 
'foobar'.regexpReplace('oo\|ar', '') returns "fb". |
-| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | 
substring(string,integer1,integer2) | Returns a substring of STRING starting 
from position INT1 with length INT2 (to the end by default). |
+| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | 
Returns a substring of STRING starting from position integer1 with length 
integer2 (to the end by default). |
+| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | 
substring(string,integer1,integer2) | Returns a substring of STRING starting 
from position integer1 with length integer2 (to the end by default). |
 | CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string 
that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 
'AABBCC'. |
 
 ## Temporal Functions
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index 382a7c341..e8d58f254 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -118,16 +118,17 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to 
parse expressions and [
 
 ## String Functions
 
-| Function             | Janino Code              | Description                
                       |
-| -------------------- | ------------------------ | 
------------------------------------------------- |
-| string1 &#124;&#124; string2 | concat(string1, string2) | Returns the 
concatenation of STRING1 and STRING2. |
-| CHAR_LENGTH(string)  | charLength(string)       | Returns the number of 
characters in STRING.       |
-| UPPER(string)        | upper(string)            | Returns string in 
uppercase.                      |
-| LOWER(string) | lower(string) | Returns string in lowercase. |
-| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes 
whitespaces at both sides. |
+| Function             | Janino Code              | Description                
                                                                                
                                                                                
       |
+| -------------------- | ------------------------ 
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| string1 &#124;&#124; string2 | concat(string1, string2) | Returns the 
concatenation of STRING1 and STRING2.                                           
                                                                                
                      |
+| CHAR_LENGTH(string)  | charLength(string)       | Returns the number of 
characters in STRING.                                                           
                                                                                
            |
+| UPPER(string)        | upper(string)            | Returns string in 
uppercase.                                                                      
                                                                                
                |
+| LOWER(string) | lower(string) | Returns string in lowercase.                 
                                                                                
                                                                     |
+| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes 
whitespaces at both sides.                                                      
                                                                                
    |
 | REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, 
string3) | Returns a string from STRING1 with all the substrings that match a 
regular expression STRING2 consecutively being replaced with STRING3. E.g., 
'foobar'.regexpReplace('oo\|ar', '') returns "fb". |
-| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | 
substring(string,integer1,integer2) | Returns a substring of STRING starting 
from position INT1 with length INT2 (to the end by default). |
-| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string 
that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 
'AABBCC'. |
+| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | 
Returns a substring of STRING starting from position integer1 with length 
integer2 (to the end by default).                                               
                                        |
+| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | 
substring(string,integer1,integer2) | Returns a substring of STRING starting 
from position integer1 with length integer2 (to the end by default).            
                                                                           |
+| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string 
that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 
'AABBCC'.                                                                       
                   |
 
 ## Temporal Functions
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index ba569fc08..649b9b8cc 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -361,11 +361,64 @@ public class SystemFunctionUtils {
     }
 
     public static String substr(String str, int beginIndex) {
-        return str.substring(beginIndex);
+        return substring(str, beginIndex);
     }
 
     public static String substr(String str, int beginIndex, int length) {
-        return str.substring(beginIndex, beginIndex + length);
+        return substring(str, beginIndex, length);
+    }
+
+    public static String substring(String str, int beginIndex) {
+        return substring(str, beginIndex, Integer.MAX_VALUE);
+    }
+
+    public static String substring(String str, int beginIndex, int length) {
+        if (length < 0) {
+            LOG.error(
+                    "length of 'substring(str, beginIndex, length)' must be >= 
0 and Int type, but length = {}",
+                    length);
+            throw new RuntimeException(
+                    "length of 'substring(str, beginIndex, length)' must be >= 
0 and Int type, but length = "
+                            + length);
+        }
+        if (length > Integer.MAX_VALUE || beginIndex > Integer.MAX_VALUE) {
+            LOG.error(
+                    "length or start of 'substring(str, beginIndex, length)' 
must be Int type, but length = {}, beginIndex = {}",
+                    beginIndex,
+                    length);
+            throw new RuntimeException(
+                    "length or start of 'substring(str, beginIndex, length)' 
must be Int type, but length = "
+                            + beginIndex
+                            + ", beginIndex = "
+                            + length);
+        }
+        if (str.isEmpty()) {
+            return "";
+        }
+
+        int startPos;
+        int endPos;
+
+        if (beginIndex > 0) {
+            startPos = beginIndex - 1;
+            if (startPos >= str.length()) {
+                return "";
+            }
+        } else if (beginIndex < 0) {
+            startPos = str.length() + beginIndex;
+            if (startPos < 0) {
+                return "";
+            }
+        } else {
+            startPos = 0;
+        }
+
+        if ((str.length() - startPos) < length) {
+            endPos = str.length();
+        } else {
+            endPos = startPos + length;
+        }
+        return str.substring(startPos, endPos);
     }
 
     public static String upper(String str) {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 658550da0..986337c8a 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -191,6 +191,7 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                                     SqlTypeFamily.INTEGER,
                                     SqlTypeFamily.INTEGER)),
                     SqlFunctionCategory.STRING);
+    public static final SqlFunction SUBSTRING = SqlStdOperatorTable.SUBSTRING;
 
     // ------------------
     // Temporal Functions
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 067842c31..9e0e9843c 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -1499,7 +1499,24 @@ public class PostTransformOperatorTest {
         testExpressionConditionTransform("concat('123', 'abc') = '123abc'");
         testExpressionConditionTransform("upper('abc') = 'ABC'");
         testExpressionConditionTransform("lower('ABC') = 'abc'");
-        testExpressionConditionTransform("SUBSTR('ABC', 1, 1) = 'B'");
+        testExpressionConditionTransform("SUBSTR('ABC', -1) = 'C'");
+        testExpressionConditionTransform("SUBSTR('ABC', -2, 2) = 'BC'");
+        testExpressionConditionTransform("SUBSTR('ABC', 0) = 'ABC'");
+        testExpressionConditionTransform("SUBSTR('ABC', 1) = 'ABC'");
+        testExpressionConditionTransform("SUBSTR('ABC', 2, 2) = 'BC'");
+        testExpressionConditionTransform("SUBSTR('ABC', 2, 100) = 'BC'");
+        testExpressionConditionTransform("SUBSTRING('ABC', -1) = 'C'");
+        testExpressionConditionTransform("SUBSTRING('ABC', -2, 2) = 'BC'");
+        testExpressionConditionTransform("SUBSTRING('ABC', 0) = 'ABC'");
+        testExpressionConditionTransform("SUBSTRING('ABC', 1) = 'ABC'");
+        testExpressionConditionTransform("SUBSTRING('ABC', 2, 2) = 'BC'");
+        testExpressionConditionTransform("SUBSTRING('ABC', 2, 100) = 'BC'");
+        testExpressionConditionTransform("SUBSTRING('ABC' FROM -1) = 'C'");
+        testExpressionConditionTransform("SUBSTRING('ABC' FROM -2 FOR 2) = 
'BC'");
+        testExpressionConditionTransform("SUBSTRING('ABC' FROM 0) = 'ABC'");
+        testExpressionConditionTransform("SUBSTRING('ABC' FROM 1) = 'ABC'");
+        testExpressionConditionTransform("SUBSTRING('ABC' FROM 2 FOR 2) = 
'BC'");
+        testExpressionConditionTransform("SUBSTRING('ABC' FROM 2 FOR 100) = 
'BC'");
         testExpressionConditionTransform("'ABC' like '^[a-zA-Z]'");
         testExpressionConditionTransform("'123' not like '^[a-zA-Z]'");
         testExpressionConditionTransform("abs(2) = 2");

Reply via email to