This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b9756414ba1 [FLINK-39360][table] `LIKE` clause doesn't support some 
patterns
b9756414ba1 is described below

commit b9756414ba1e021ac30d824c5f5ab1b85d0f83ee
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 31 16:35:56 2026 +0200

    [FLINK-39360][table] `LIKE` clause doesn't support some patterns
---
 .../apache/flink/table/functions/SqlLikeUtils.java |   5 +-
 .../planner/codegen/CodeGeneratorContext.scala     |  15 +-
 .../table/planner/codegen/GenerateUtils.scala      |  10 +-
 .../planner/codegen/GeneratedExpression.scala      |   8 +-
 .../table/planner/codegen/calls/LikeCallGen.scala  |  85 +++++----
 .../planner/codegen/calls/ScalarOperatorGens.scala |   4 +-
 .../planner/functions/LikeFunctionITCase.java      | 209 +++++++++++++++++++++
 .../runtime/functions/SqlLikeChainChecker.java     |  16 +-
 8 files changed, 294 insertions(+), 58 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
index 1cbb1b96884..f0b09d31418 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
@@ -18,6 +18,7 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
 
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -140,11 +141,11 @@ public class SqlLikeUtils {
     }
 
     public static RuntimeException invalidEscapeCharacter(String s) {
-        return new RuntimeException("Invalid escape character '" + s + "'");
+        return new ValidationException("Invalid escape character '" + s + "'");
     }
 
     public static RuntimeException invalidEscapeSequence(String s, int i) {
-        return new RuntimeException("Invalid escape sequence '" + s + "', " + 
i);
+        return new ValidationException("Invalid escape sequence '" + s + "', " 
+ i);
     }
 
     private static void similarEscapeRuleChecking(String sqlPattern, char 
escapeChar) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 40fef39856a..45585930daa 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.runtime.util.collections._
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical._
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
-import org.apache.flink.table.utils.DateTimeUtils
+import org.apache.flink.table.utils.{DateTimeUtils, EncodingUtils}
 import org.apache.flink.util.InstantiationUtil
 
 import java.time.ZoneId
@@ -988,22 +988,21 @@ class CodeGeneratorContext(
   }
 
   /**
-   * Adds a reusable string constant to the member area of the generated class.
+   * Adds an already pre-escaped string constant to the reusable member area 
of the generated class.
    *
-   * The string must be already escaped with
-   * [[org.apache.flink.table.utils.EncodingUtils.escapeJava()]].
+   * The string must be already escaped with [[EncodingUtils.escapeJava()]].
    */
-  def addReusableEscapedStringConstant(value: String): String = {
-    reusableStringConstants.get(value) match {
+  def addReusablePreEscapedStringConstant(alreadyEscapedValue: String): String 
= {
+    reusableStringConstants.get(alreadyEscapedValue) match {
       case Some(field) => field
       case None =>
         val field = newName(this, "str")
         val stmt =
           s"""
-             |private final $BINARY_STRING $field = 
$BINARY_STRING.fromString("$value");
+             |private final $BINARY_STRING $field = 
$BINARY_STRING.fromString("$alreadyEscapedValue");
            """.stripMargin
         reusableMemberStatements.add(stmt)
-        reusableStringConstants(value) = field
+        reusableStringConstants(alreadyEscapedValue) = field
         field
     }
   }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
index 865404ac7e8..9cfae27fe44 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
@@ -306,10 +306,12 @@ object GenerateUtils {
       // as they're not cheap to construct. For the other types, the return 
term is directly
       // the literal value
       case CHAR | VARCHAR =>
-        val escapedValue =
-          
EncodingUtils.escapeJava(literalValue.asInstanceOf[BinaryStringData].toString)
-        val field = ctx.addReusableEscapedStringConstant(escapedValue)
-        generateNonNullLiteral(literalType, field, 
StringData.fromString(escapedValue))
+        val str = literalValue.asInstanceOf[BinaryStringData]
+        val field = 
ctx.addReusablePreEscapedStringConstant(EncodingUtils.escapeJava(str.toString))
+        // The original value should be passed as literalValue
+        // all required escaping should be done in corresponding code 
generation,
+        // so that the literalValue can be also used directly when needed
+        generateNonNullLiteral(literalType, field, str)
 
       case BINARY | VARBINARY =>
         val bytesVal = literalValue.asInstanceOf[Array[Byte]]
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
index 108ea10d1ad..325a7608c27 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
@@ -33,8 +33,12 @@ import org.apache.flink.table.types.logical.LogicalType
  * @param resultType
  *   type of the resultTerm
  * @param literalValue
- *   None if the expression is not literal. Otherwise it represent the 
original object of the
- *   literal.
+ *   Contains the literal value (as internal data structure) for deep literal 
inspection if the
+ *   originating expression was a literal. Literal inspection is useful for 
performance
+ *   optimizations. For example, figuring out whether a time parsing function 
ever produces
+ *   sub-second data by inspecting the "format" literal string. NOTE: The 
literal value is not
+ *   intended to be used in generated code, use `resultTerm` for this purpose. 
The literal value is
+ *   NOT escaped.
  */
 case class GeneratedExpression(
     resultTerm: String,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
index b3dc9a1911c..7d05ad13d02 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
@@ -23,6 +23,7 @@ import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName,
 import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull
 import org.apache.flink.table.runtime.functions.SqlLikeChainChecker
 import org.apache.flink.table.types.logical.{BooleanType, LogicalType}
+import org.apache.flink.table.utils.EncodingUtils
 
 import java.util.regex.Pattern
 
@@ -57,44 +58,44 @@ class LikeCallGen extends CallGenerator {
             !pattern.contains("_")
           } else {
             val escape = operands(2).literalValue.get.toString
-            if ((escape.length == 2 && escape.charAt(0) != '\\') || 
escape.length > 2) {
-              throw SqlLikeUtils.invalidEscapeCharacter(escape)
-            }
-            val escapeChar = escape.charAt(escape.length - 1)
-            var matched = true
-            var i = 0
-            val newBuilder = new StringBuilder
-            while (i < pattern.length && matched) {
-              var c = pattern.charAt(i)
-              if (c == '\\') {
-                i += 1
-                c = pattern.charAt(i)
+            if (escape.isEmpty) {
+              !pattern.contains("_")
+            } else {
+              if (escape.length > 1) {
+                throw SqlLikeUtils.invalidEscapeCharacter(escape)
               }
-              if (c == escapeChar) {
-                if (i == (pattern.length - 1)) {
-                  throw SqlLikeUtils.invalidEscapeSequence(pattern, i)
-                }
-                val nextChar = pattern.charAt(i + 1)
-                if (nextChar == '%') {
+              val escapeChar = escape.charAt(escape.length - 1)
+              var matched = true
+              var i = 0
+              val newBuilder = new StringBuilder
+              while (i < pattern.length && matched) {
+                val c = pattern.charAt(i)
+                if (c == escapeChar) {
+                  if (i == (pattern.length - 1)) {
+                    throw SqlLikeUtils.invalidEscapeSequence(pattern, i)
+                  }
+                  val nextChar = pattern.charAt(i + 1)
+                  if (nextChar == '%') {
+                    matched = false
+                  } else if ((nextChar == '_') || (nextChar == escapeChar)) {
+                    newBuilder.append(nextChar)
+                    i += 1
+                  } else {
+                    throw SqlLikeUtils.invalidEscapeSequence(pattern, i)
+                  }
+                } else if (c == '_') {
                   matched = false
-                } else if ((nextChar == '_') || (nextChar == escapeChar)) {
-                  newBuilder.append(nextChar)
-                  i += 1
                 } else {
-                  throw SqlLikeUtils.invalidEscapeSequence(pattern, i)
+                  newBuilder.append(c)
                 }
-              } else if (c == '_') {
-                matched = false
-              } else {
-                newBuilder.append(c)
+                i += 1
               }
-              i += 1
-            }
 
-            if (matched) {
-              newPattern = newBuilder.toString
+              if (matched) {
+                newPattern = newBuilder.toString
+              }
+              matched
             }
-            matched
           }
 
           if (allowQuick) {
@@ -102,23 +103,28 @@ class LikeCallGen extends CallGenerator {
             val beginMatcher = BEGIN_PATTERN.matcher(newPattern)
             val endMatcher = END_PATTERN.matcher(newPattern)
             val middleMatcher = MIDDLE_PATTERN.matcher(newPattern)
+            val escapedNewPattern = EncodingUtils.escapeJava(newPattern)
 
             if (noneMatcher.matches()) {
-              val reusePattern = 
ctx.addReusableEscapedStringConstant(newPattern)
+              val reusePattern = 
ctx.addReusablePreEscapedStringConstant(escapedNewPattern)
               s"${terms.head}.equals($reusePattern)"
             } else if (beginMatcher.matches()) {
-              val field = 
ctx.addReusableEscapedStringConstant(beginMatcher.group(1))
+              val escapedStartValue = 
EncodingUtils.escapeJava(beginMatcher.group(1))
+              val field = 
ctx.addReusablePreEscapedStringConstant(escapedStartValue)
               s"${terms.head}.startsWith($field)"
             } else if (endMatcher.matches()) {
-              val field = 
ctx.addReusableEscapedStringConstant(endMatcher.group(1))
+              val escapedEndValue = 
EncodingUtils.escapeJava(endMatcher.group(1))
+              val field = 
ctx.addReusablePreEscapedStringConstant(escapedEndValue)
               s"${terms.head}.endsWith($field)"
             } else if (middleMatcher.matches()) {
-              val field = 
ctx.addReusableEscapedStringConstant(middleMatcher.group(1))
+              val escapedMiddleValue = 
EncodingUtils.escapeJava(middleMatcher.group(1))
+              val field = 
ctx.addReusablePreEscapedStringConstant(escapedMiddleValue)
               s"${terms.head}.contains($field)"
             } else {
               val field = className[SqlLikeChainChecker]
               val checker = newName(ctx, "likeChainChecker")
-              ctx.addReusableMember(s"$field $checker = new 
$field(${"\""}$newPattern${"\""});")
+              ctx.addReusableMember(
+                s"$field $checker = new 
$field(${"\""}$escapedNewPattern${"\""});")
               s"$checker.check(${terms.head})"
             }
           } else {
@@ -129,15 +135,18 @@ class LikeCallGen extends CallGenerator {
             val escape = if (operands.size == 2) {
               "null"
             } else {
+              val escapedEscapeLiteral =
+                EncodingUtils.escapeJava(operands(2).literalValue.get.toString)
               s"""
-                 |"${operands(2).literalValue.get}"
+                 |"$escapedEscapeLiteral"
                """.stripMargin
             }
+            val escapedPatternLiteral = EncodingUtils.escapeJava(pattern)
             ctx.addReusableMember(
               s"""
                  |$patternClass $patternName =
                  |  $patternClass.compile(
-                 |    
$likeClass.sqlToRegexLike("${operands(1).literalValue.get}", $escape));
+                 |    $likeClass.sqlToRegexLike("$escapedPatternLiteral", 
$escape));
                  |""".stripMargin)
             s"$patternName.matcher(${terms.head}.toString()).matches()"
           }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 208c0d8d92c..0373d2a0df0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.codegen.calls
 
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.data.binary.BinaryArrayData
+import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryStringData}
 import org.apache.flink.table.data.util.MapDataUtil
 import org.apache.flink.table.data.utils.CastExecutor
 import org.apache.flink.table.data.writer.{BinaryArrayWriter, BinaryRowWriter}
@@ -41,6 +41,7 @@ import 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldTypes, 
getPrecision, getScale}
 import 
org.apache.flink.table.types.logical.utils.LogicalTypeMerging.findCommonType
 import org.apache.flink.table.utils.DateTimeUtils.MILLIS_PER_DAY
+import org.apache.flink.table.utils.EncodingUtils
 import org.apache.flink.types.ColumnList
 import org.apache.flink.util.Preconditions.checkArgument
 
@@ -1803,6 +1804,7 @@ object ScalarOperatorGens {
     }
 
     try {
+      // No escaping here as it will be done in the primitiveLiteralForType 
according to the type of the literal value.
       val result = castExecutor.cast(literalExpr.literalValue.get)
       val resultTerm = newName(ctx, "stringToTime")
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/LikeFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/LikeFunctionITCase.java
new file mode 100644
index 00000000000..f2c29eae578
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/LikeFunctionITCase.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import java.util.stream.Stream;
+
+/** Integration tests for {@code LIKE <pattern> [ESCAPE <escape>]} pattern 
matching operations. */
+class LikeFunctionITCase extends BuiltInFunctionTestBase {
+
+    @Override
+    Stream<TestSetSpec> getTestSetSpecs() {
+        return Stream.of(withEscape(), withoutEscape()).flatMap(s -> s);
+    }
+
+    private Stream<TestSetSpec> withoutEscape() {
+        return Stream.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+                        .onFieldsWithData("test", "t\"est", "tes\"t", 
"t\"es\"t")
+                        .andDataTypes(
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING())
+
+                        // Multiple % with quote in middle segment
+                        .testSqlResult("f0 LIKE 'a%b\"c%d'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE 't%es%t'", true, 
DataTypes.BOOLEAN())
+
+                        // Quote in first segment
+                        .testSqlResult("f0 LIKE 'a\"b%c%d'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f1 LIKE 't\"e%s%t'", true, 
DataTypes.BOOLEAN())
+
+                        // Quote in last segment
+                        .testSqlResult("f0 LIKE 'a%b%c\"d'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f2 LIKE 't%e%s\"t'", true, 
DataTypes.BOOLEAN())
+
+                        // Multiple quotes
+                        .testSqlResult("f0 LIKE 'a\"%b\"%c'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f3 LIKE 't\"%s\"%t'", true, 
DataTypes.BOOLEAN())
+
+                        // Pattern with underscore and quote
+                        .testSqlResult("f0 LIKE 'te_t\"'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f2 LIKE 'te_\"t'", true, 
DataTypes.BOOLEAN())
+
+                        // Multiple underscores with quotes
+                        .testSqlResult("f0 LIKE '_\"_test_\"_'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f3 LIKE '_\"__\"_'", true, 
DataTypes.BOOLEAN()),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+                        .onFieldsWithData("test", "abc%def", "test_123", 
"hello'world")
+                        .andDataTypes(
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING())
+
+                        // Normal exact match - no special chars
+                        .testSqlResult("f0 LIKE 'test'", true, 
DataTypes.BOOLEAN())
+
+                        // Normal exact match - in case of empty strings
+                        .testSqlResult("'' LIKE ''", true, 
DataTypes.BOOLEAN().notNull())
+                        .testSqlResult("'' LIKE '%'", true, 
DataTypes.BOOLEAN().notNull())
+                        .testSqlResult("f0 LIKE ''", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE '%%'", true, 
DataTypes.BOOLEAN())
+
+                        // Starts with pattern
+                        .testSqlResult("f0 LIKE 'te%'", true, 
DataTypes.BOOLEAN())
+
+                        // Ends with pattern
+                        .testSqlResult("f0 LIKE '%st'", true, 
DataTypes.BOOLEAN())
+
+                        // Contains pattern
+                        .testSqlResult("f0 LIKE '%es%'", true, 
DataTypes.BOOLEAN())
+
+                        // Single quote in data (not pattern)
+                        // SQL escapes single quote as ''
+                        .testSqlResult("f3 LIKE '%''%'", true, 
DataTypes.BOOLEAN())
+
+                        // Pattern with % in data matches literal
+                        .testSqlResult("f1 LIKE 'abc%def'", true, 
DataTypes.BOOLEAN())
+
+                        // Pattern doesn't match
+                        .testSqlResult("f0 LIKE 'orange'", false, 
DataTypes.BOOLEAN()),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+                        .onFieldsWithData("test")
+                        .andDataTypes(DataTypes.STRING())
+
+                        // With backslash and double quote in the middle
+                        .testSqlResult("f0 LIKE 'test\\\"more'", false, 
DataTypes.BOOLEAN())
+
+                        // With backslash at the end
+                        .testSqlResult("f0 LIKE 'test\\\\'", false, 
DataTypes.BOOLEAN()),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+                        .onFieldsWithData("test", "\"test", "te\"st", 
"test\"", "test\\")
+                        .andDataTypes(
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING())
+
+                        // Quick path
+                        .testSqlResult("f0 LIKE 'test\"quote'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f2 LIKE 'te\"st'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE '\"test'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f1 LIKE '\"test'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE 'test\"'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f3 LIKE 'test\"'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE 'start\"test%'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f2 LIKE 'te\"s%'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE '%test\"end'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f2 LIKE '%te\"st'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE '%mid\"dle%'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f2 LIKE '%te\"st%'", true, 
DataTypes.BOOLEAN())
+
+                        // Trailing backslash
+                        .testSqlResult("f0 LIKE 'test\\'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f4 LIKE 'test\\'", true, 
DataTypes.BOOLEAN()));
+    }
+
+    private Stream<TestSetSpec> withEscape() {
+        return Stream.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+                        .onFieldsWithData("test", "test%", "te_st", "te\"st", 
"test\\", "✅test✅")
+                        .andDataTypes(
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING())
+                        // Empty strings in pattern or escape
+                        .testSqlResult("f0 LIKE 'test\"end' ESCAPE ''", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE '' ESCAPE ''", false, 
DataTypes.BOOLEAN())
+                        // Escaping with emoji
+                        .testSqlResult("f0 LIKE 'test' ESCAPE '✅'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f1 LIKE 'test✅%' ESCAPE '✅'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f1 LIKE 'test!%' ESCAPE '!'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE '✅test' ESCAPE '!'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f0 LIKE '✅test' ESCAPE '\\'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f5 LIKE '✅test✅' ESCAPE '\\'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f5 LIKE '✅%✅' ESCAPE '\\'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f5 LIKE '✅%' ESCAPE '\\'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f5 LIKE '%st✅' ESCAPE '\\'", true, 
DataTypes.BOOLEAN())
+                        // Mixed escaped symbols
+                        .testSqlResult("f2 LIKE 'te_st' ESCAPE '!'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f2 LIKE 'te__st' ESCAPE '_'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f1 LIKE 'test_%' ESCAPE '_'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f2 LIKE 'te%_st' ESCAPE '%'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f1 LIKE 'test%%' ESCAPE '%'", true, 
DataTypes.BOOLEAN())
+                        .testSqlValidationError(
+                                "f2 LIKE 'te_st' ESCAPE '_'", "Invalid escape 
sequence 'te_st', 2")
+                        .testSqlValidationError(
+                                "f1 LIKE 'test_' ESCAPE '_'", "Invalid escape 
sequence 'test_', 4")
+                        .testSqlValidationError(
+                                "f2 LIKE 'te%st' ESCAPE '%'", "Invalid escape 
sequence 'te%st', 2")
+                        .testSqlValidationError(
+                                "f1 LIKE 'test%' ESCAPE '%'", "Invalid escape 
sequence 'test%', 4")
+                        .testSqlValidationError(
+                                "f0 LIKE 'test\\\"end' ESCAPE '\\'",
+                                "Invalid escape sequence 'test\\\"end', 4")
+                        .testSqlValidationError(
+                                "f0 LIKE '%e_t%' ESCAPE 'ab'", "Invalid escape 
character 'ab'")
+                        // Mixed
+                        .testSqlResult("f0 LIKE 'test\"end' ESCAPE '!'", 
false, DataTypes.BOOLEAN())
+                        .testSqlResult("f3 LIKE 'te\"st' ESCAPE '!'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult("f4 LIKE 'test\\' ESCAPE '!'", true, 
DataTypes.BOOLEAN())
+                        .testSqlResult(
+                                "'a1bc' LIKE CAST('a%\"+1+\"b%c' AS STRING) 
ESCAPE '!'",
+                                false, DataTypes.BOOLEAN().notNull())
+                        .testSqlResult(
+                                "'a1\"+1+\"bc' LIKE CAST('a%\"+1+\"b%c' AS 
STRING) ESCAPE '!'",
+                                true, DataTypes.BOOLEAN().notNull())
+                        // Unicode like sequence
+                        .testSqlResult(
+                                "f0 LIKE 'test" + "\\u" + "000Aend' ESCAPE 
'!'",
+                                false,
+                                DataTypes.BOOLEAN())
+                        .testSqlResult(
+                                "'test\\u000Aend' LIKE 'test" + "\\u" + 
"000Aend' ESCAPE '!'",
+                                true,
+                                DataTypes.BOOLEAN().notNull())
+                        // Special characters
+                        .testSqlResult(
+                                "f0 LIKE '\btest\ne\\nd\f' ESCAPE '!'", false, 
DataTypes.BOOLEAN())
+                        .testSqlResult(
+                                "'\btest\ne\\nd\f' LIKE '\btest\ne\\nd\f' 
ESCAPE '!'",
+                                true,
+                                DataTypes.BOOLEAN().notNull()));
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlLikeChainChecker.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlLikeChainChecker.java
index 3be1666a1db..06068031bfc 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlLikeChainChecker.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlLikeChainChecker.java
@@ -45,11 +45,13 @@ public class SqlLikeChainChecker {
     private final int[] midLens;
     private final int beginLen;
     private final int endLen;
+    private final boolean leftAnchor;
+    private final boolean rightAnchor;
 
     public SqlLikeChainChecker(String pattern) {
         final StringTokenizer tokens = new StringTokenizer(pattern, "%");
-        final boolean leftAnchor = !pattern.startsWith("%");
-        final boolean rightAnchor = !pattern.endsWith("%");
+        leftAnchor = !pattern.startsWith("%");
+        rightAnchor = !pattern.endsWith("%");
         int len = 0;
         // at least 2 checkers always
         BinaryStringData leftPattern = null;
@@ -93,7 +95,15 @@ public class SqlLikeChainChecker {
         MemorySegment[] segments = str.getSegments();
         int pos = str.getOffset();
         int mark = str.getSizeInBytes();
-        if (str.getSizeInBytes() < minLen) {
+        // Returns false early if either:
+        // the input is too short to match the pattern, or
+        // the pattern is empty (or anchored with no literals) but the input 
is not empty.
+        if (mark < minLen
+                || beginPattern == null
+                        && endPattern == null
+                        && middlePatterns.length == 0
+                        && mark > 0
+                        && (leftAnchor || rightAnchor)) {
             return false;
         }
         // prefix, extend start

Reply via email to