This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push:
new 528e4372204 [FLINK-39360][table] `LIKE` clause doesn't support some
patterns
528e4372204 is described below
commit 528e4372204b762ebc5edc9be620eec353e5ab6c
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 31 18:47:10 2026 +0200
[FLINK-39360][table] `LIKE` clause doesn't support some patterns
---
.../apache/flink/table/functions/SqlLikeUtils.java | 5 +-
.../planner/codegen/CodeGeneratorContext.scala | 15 +-
.../table/planner/codegen/GenerateUtils.scala | 10 +-
.../planner/codegen/GeneratedExpression.scala | 8 +-
.../table/planner/codegen/calls/LikeCallGen.scala | 85 +++++----
.../planner/codegen/calls/ScalarOperatorGens.scala | 4 +-
.../planner/functions/LikeFunctionITCase.java | 209 +++++++++++++++++++++
.../runtime/functions/SqlLikeChainChecker.java | 16 +-
8 files changed, 294 insertions(+), 58 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
index 1cbb1b96884..f0b09d31418 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/SqlLikeUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.functions;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -140,11 +141,11 @@ public class SqlLikeUtils {
}
public static RuntimeException invalidEscapeCharacter(String s) {
- return new RuntimeException("Invalid escape character '" + s + "'");
+ return new ValidationException("Invalid escape character '" + s + "'");
}
public static RuntimeException invalidEscapeSequence(String s, int i) {
- return new RuntimeException("Invalid escape sequence '" + s + "', " +
i);
+ return new ValidationException("Invalid escape sequence '" + s + "', "
+ i);
}
private static void similarEscapeRuleChecking(String sqlPattern, char
escapeChar) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 7cbdb50b134..b7951c2c8e8 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.runtime.util.collections._
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical._
import org.apache.flink.table.types.logical.LogicalTypeRoot._
-import org.apache.flink.table.utils.DateTimeUtils
+import org.apache.flink.table.utils.{DateTimeUtils, EncodingUtils}
import org.apache.flink.util.InstantiationUtil
import java.time.ZoneId
@@ -982,22 +982,21 @@ class CodeGeneratorContext(
}
/**
- * Adds a reusable string constant to the member area of the generated class.
+ * Adds an already pre-escaped string constant to the reusable member area
of the generated class.
*
- * The string must be already escaped with
- * [[org.apache.flink.table.utils.EncodingUtils.escapeJava()]].
+ * The string must be already escaped with [[EncodingUtils.escapeJava()]].
*/
- def addReusableEscapedStringConstant(value: String): String = {
- reusableStringConstants.get(value) match {
+ def addReusablePreEscapedStringConstant(alreadyEscapedValue: String): String
= {
+ reusableStringConstants.get(alreadyEscapedValue) match {
case Some(field) => field
case None =>
val field = newName(this, "str")
val stmt =
s"""
- |private final $BINARY_STRING $field =
$BINARY_STRING.fromString("$value");
+ |private final $BINARY_STRING $field =
$BINARY_STRING.fromString("$alreadyEscapedValue");
""".stripMargin
reusableMemberStatements.add(stmt)
- reusableStringConstants(value) = field
+ reusableStringConstants(alreadyEscapedValue) = field
field
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
index 9e3f9976f83..6c6c1f5d4fe 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
@@ -306,10 +306,12 @@ object GenerateUtils {
// as they're not cheap to construct. For the other types, the return
term is directly
// the literal value
case CHAR | VARCHAR =>
- val escapedValue =
-
EncodingUtils.escapeJava(literalValue.asInstanceOf[BinaryStringData].toString)
- val field = ctx.addReusableEscapedStringConstant(escapedValue)
- generateNonNullLiteral(literalType, field,
StringData.fromString(escapedValue))
+ val str = literalValue.asInstanceOf[BinaryStringData]
+ val field =
ctx.addReusablePreEscapedStringConstant(EncodingUtils.escapeJava(str.toString))
+ // The original value should be passed as literalValue
+ // all required escaping should be done in corresponding code
generation,
+ // so that the literalValue can be also used directly when needed
+ generateNonNullLiteral(literalType, field, str)
case BINARY | VARBINARY =>
val bytesVal = literalValue.asInstanceOf[Array[Byte]]
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
index 108ea10d1ad..325a7608c27 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GeneratedExpression.scala
@@ -33,8 +33,12 @@ import org.apache.flink.table.types.logical.LogicalType
* @param resultType
* type of the resultTerm
* @param literalValue
- * None if the expression is not literal. Otherwise it represent the
original object of the
- * literal.
+ * Contains the literal value (as internal data structure) for deep literal
inspection if the
+ * originating expression was a literal. Literal inspection is useful for
performance
+ * optimizations. For example, figuring out whether a time parsing function
ever produces
+ * sub-second data by inspecting the "format" literal string. NOTE: The
literal value is not
+ * intended to be used in generated code, use `resultTerm` for this purpose.
The literal value is
+ * NOT escaped.
*/
case class GeneratedExpression(
resultTerm: String,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
index b3dc9a1911c..7d05ad13d02 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/LikeCallGen.scala
@@ -23,6 +23,7 @@ import
org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName,
import
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull
import org.apache.flink.table.runtime.functions.SqlLikeChainChecker
import org.apache.flink.table.types.logical.{BooleanType, LogicalType}
+import org.apache.flink.table.utils.EncodingUtils
import java.util.regex.Pattern
@@ -57,44 +58,44 @@ class LikeCallGen extends CallGenerator {
!pattern.contains("_")
} else {
val escape = operands(2).literalValue.get.toString
- if ((escape.length == 2 && escape.charAt(0) != '\\') ||
escape.length > 2) {
- throw SqlLikeUtils.invalidEscapeCharacter(escape)
- }
- val escapeChar = escape.charAt(escape.length - 1)
- var matched = true
- var i = 0
- val newBuilder = new StringBuilder
- while (i < pattern.length && matched) {
- var c = pattern.charAt(i)
- if (c == '\\') {
- i += 1
- c = pattern.charAt(i)
+ if (escape.isEmpty) {
+ !pattern.contains("_")
+ } else {
+ if (escape.length > 1) {
+ throw SqlLikeUtils.invalidEscapeCharacter(escape)
}
- if (c == escapeChar) {
- if (i == (pattern.length - 1)) {
- throw SqlLikeUtils.invalidEscapeSequence(pattern, i)
- }
- val nextChar = pattern.charAt(i + 1)
- if (nextChar == '%') {
+ val escapeChar = escape.charAt(escape.length - 1)
+ var matched = true
+ var i = 0
+ val newBuilder = new StringBuilder
+ while (i < pattern.length && matched) {
+ val c = pattern.charAt(i)
+ if (c == escapeChar) {
+ if (i == (pattern.length - 1)) {
+ throw SqlLikeUtils.invalidEscapeSequence(pattern, i)
+ }
+ val nextChar = pattern.charAt(i + 1)
+ if (nextChar == '%') {
+ matched = false
+ } else if ((nextChar == '_') || (nextChar == escapeChar)) {
+ newBuilder.append(nextChar)
+ i += 1
+ } else {
+ throw SqlLikeUtils.invalidEscapeSequence(pattern, i)
+ }
+ } else if (c == '_') {
matched = false
- } else if ((nextChar == '_') || (nextChar == escapeChar)) {
- newBuilder.append(nextChar)
- i += 1
} else {
- throw SqlLikeUtils.invalidEscapeSequence(pattern, i)
+ newBuilder.append(c)
}
- } else if (c == '_') {
- matched = false
- } else {
- newBuilder.append(c)
+ i += 1
}
- i += 1
- }
- if (matched) {
- newPattern = newBuilder.toString
+ if (matched) {
+ newPattern = newBuilder.toString
+ }
+ matched
}
- matched
}
if (allowQuick) {
@@ -102,23 +103,28 @@ class LikeCallGen extends CallGenerator {
val beginMatcher = BEGIN_PATTERN.matcher(newPattern)
val endMatcher = END_PATTERN.matcher(newPattern)
val middleMatcher = MIDDLE_PATTERN.matcher(newPattern)
+ val escapedNewPattern = EncodingUtils.escapeJava(newPattern)
if (noneMatcher.matches()) {
- val reusePattern =
ctx.addReusableEscapedStringConstant(newPattern)
+ val reusePattern =
ctx.addReusablePreEscapedStringConstant(escapedNewPattern)
s"${terms.head}.equals($reusePattern)"
} else if (beginMatcher.matches()) {
- val field =
ctx.addReusableEscapedStringConstant(beginMatcher.group(1))
+ val escapedStartValue =
EncodingUtils.escapeJava(beginMatcher.group(1))
+ val field =
ctx.addReusablePreEscapedStringConstant(escapedStartValue)
s"${terms.head}.startsWith($field)"
} else if (endMatcher.matches()) {
- val field =
ctx.addReusableEscapedStringConstant(endMatcher.group(1))
+ val escapedEndValue =
EncodingUtils.escapeJava(endMatcher.group(1))
+ val field =
ctx.addReusablePreEscapedStringConstant(escapedEndValue)
s"${terms.head}.endsWith($field)"
} else if (middleMatcher.matches()) {
- val field =
ctx.addReusableEscapedStringConstant(middleMatcher.group(1))
+ val escapedMiddleValue =
EncodingUtils.escapeJava(middleMatcher.group(1))
+ val field =
ctx.addReusablePreEscapedStringConstant(escapedMiddleValue)
s"${terms.head}.contains($field)"
} else {
val field = className[SqlLikeChainChecker]
val checker = newName(ctx, "likeChainChecker")
- ctx.addReusableMember(s"$field $checker = new
$field(${"\""}$newPattern${"\""});")
+ ctx.addReusableMember(
+ s"$field $checker = new
$field(${"\""}$escapedNewPattern${"\""});")
s"$checker.check(${terms.head})"
}
} else {
@@ -129,15 +135,18 @@ class LikeCallGen extends CallGenerator {
val escape = if (operands.size == 2) {
"null"
} else {
+ val escapedEscapeLiteral =
+ EncodingUtils.escapeJava(operands(2).literalValue.get.toString)
s"""
- |"${operands(2).literalValue.get}"
+ |"$escapedEscapeLiteral"
""".stripMargin
}
+ val escapedPatternLiteral = EncodingUtils.escapeJava(pattern)
ctx.addReusableMember(
s"""
|$patternClass $patternName =
| $patternClass.compile(
- |
$likeClass.sqlToRegexLike("${operands(1).literalValue.get}", $escape));
+ | $likeClass.sqlToRegexLike("$escapedPatternLiteral",
$escape));
|""".stripMargin)
s"$patternName.matcher(${terms.head}.toString()).matches()"
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 1bea73703f8..e5cfb343978 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.codegen.calls
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.data.binary.BinaryArrayData
+import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryStringData}
import org.apache.flink.table.data.util.MapDataUtil
import org.apache.flink.table.data.utils.CastExecutor
import org.apache.flink.table.data.writer.{BinaryArrayWriter, BinaryRowWriter}
@@ -41,6 +41,7 @@ import
org.apache.flink.table.types.logical.utils.LogicalTypeChecks
import
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldTypes,
getPrecision, getScale}
import
org.apache.flink.table.types.logical.utils.LogicalTypeMerging.findCommonType
import org.apache.flink.table.utils.DateTimeUtils.MILLIS_PER_DAY
+import org.apache.flink.table.utils.EncodingUtils
import org.apache.flink.types.ColumnList
import org.apache.flink.util.Preconditions.checkArgument
@@ -1693,6 +1694,7 @@ object ScalarOperatorGens {
}
try {
+ // No escaping here as it will be done in the primitiveLiteralForType
according to the type of the literal value.
val result = castExecutor.cast(literalExpr.literalValue.get)
val resultTerm = newName(ctx, "stringToTime")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/LikeFunctionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/LikeFunctionITCase.java
new file mode 100644
index 00000000000..f2c29eae578
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/LikeFunctionITCase.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import java.util.stream.Stream;
+
+/** Integration tests for {@code LIKE <pattern> [ESCAPE <escape>]} pattern
matching operations. */
+class LikeFunctionITCase extends BuiltInFunctionTestBase {
+
+ @Override
+ Stream<TestSetSpec> getTestSetSpecs() {
+ return Stream.of(withEscape(), withoutEscape()).flatMap(s -> s);
+ }
+
+ private Stream<TestSetSpec> withoutEscape() {
+ return Stream.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+ .onFieldsWithData("test", "t\"est", "tes\"t",
"t\"es\"t")
+ .andDataTypes(
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING())
+
+ // Multiple % with quote in middle segment
+ .testSqlResult("f0 LIKE 'a%b\"c%d'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE 't%es%t'", true,
DataTypes.BOOLEAN())
+
+ // Quote in first segment
+ .testSqlResult("f0 LIKE 'a\"b%c%d'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f1 LIKE 't\"e%s%t'", true,
DataTypes.BOOLEAN())
+
+ // Quote in last segment
+ .testSqlResult("f0 LIKE 'a%b%c\"d'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f2 LIKE 't%e%s\"t'", true,
DataTypes.BOOLEAN())
+
+ // Multiple quotes
+ .testSqlResult("f0 LIKE 'a\"%b\"%c'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f3 LIKE 't\"%s\"%t'", true,
DataTypes.BOOLEAN())
+
+ // Pattern with underscore and quote
+ .testSqlResult("f0 LIKE 'te_t\"'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f2 LIKE 'te_\"t'", true,
DataTypes.BOOLEAN())
+
+ // Multiple underscores with quotes
+ .testSqlResult("f0 LIKE '_\"_test_\"_'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f3 LIKE '_\"__\"_'", true,
DataTypes.BOOLEAN()),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+ .onFieldsWithData("test", "abc%def", "test_123",
"hello'world")
+ .andDataTypes(
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING())
+
+ // Normal exact match - no special chars
+ .testSqlResult("f0 LIKE 'test'", true,
DataTypes.BOOLEAN())
+
+ // Normal exact match - in case of empty strings
+ .testSqlResult("'' LIKE ''", true,
DataTypes.BOOLEAN().notNull())
+ .testSqlResult("'' LIKE '%'", true,
DataTypes.BOOLEAN().notNull())
+ .testSqlResult("f0 LIKE ''", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE '%%'", true,
DataTypes.BOOLEAN())
+
+ // Starts with pattern
+ .testSqlResult("f0 LIKE 'te%'", true,
DataTypes.BOOLEAN())
+
+ // Ends with pattern
+ .testSqlResult("f0 LIKE '%st'", true,
DataTypes.BOOLEAN())
+
+ // Contains pattern
+ .testSqlResult("f0 LIKE '%es%'", true,
DataTypes.BOOLEAN())
+
+ // Single quote in data (not pattern)
+ // SQL escapes single quote as ''
+ .testSqlResult("f3 LIKE '%''%'", true,
DataTypes.BOOLEAN())
+
+ // Pattern with % in data matches literal
+ .testSqlResult("f1 LIKE 'abc%def'", true,
DataTypes.BOOLEAN())
+
+ // Pattern doesn't match
+ .testSqlResult("f0 LIKE 'orange'", false,
DataTypes.BOOLEAN()),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+ .onFieldsWithData("test")
+ .andDataTypes(DataTypes.STRING())
+
+ // With backslash and double quote in the middle
+ .testSqlResult("f0 LIKE 'test\\\"more'", false,
DataTypes.BOOLEAN())
+
+ // With backslash at the end
+ .testSqlResult("f0 LIKE 'test\\\\'", false,
DataTypes.BOOLEAN()),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+ .onFieldsWithData("test", "\"test", "te\"st",
"test\"", "test\\")
+ .andDataTypes(
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING())
+
+ // Quick path
+ .testSqlResult("f0 LIKE 'test\"quote'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f2 LIKE 'te\"st'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE '\"test'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f1 LIKE '\"test'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE 'test\"'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f3 LIKE 'test\"'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE 'start\"test%'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f2 LIKE 'te\"s%'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE '%test\"end'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f2 LIKE '%te\"st'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE '%mid\"dle%'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f2 LIKE '%te\"st%'", true,
DataTypes.BOOLEAN())
+
+ // Trailing backslash
+ .testSqlResult("f0 LIKE 'test\\'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f4 LIKE 'test\\'", true,
DataTypes.BOOLEAN()));
+ }
+
+ private Stream<TestSetSpec> withEscape() {
+ return Stream.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.LIKE)
+ .onFieldsWithData("test", "test%", "te_st", "te\"st",
"test\\", "✅test✅")
+ .andDataTypes(
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING())
+ // Empty strings in pattern or escape
+ .testSqlResult("f0 LIKE 'test\"end' ESCAPE ''", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE '' ESCAPE ''", false,
DataTypes.BOOLEAN())
+ // Escaping with emoji
+ .testSqlResult("f0 LIKE 'test' ESCAPE '✅'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f1 LIKE 'test✅%' ESCAPE '✅'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f1 LIKE 'test!%' ESCAPE '!'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE '✅test' ESCAPE '!'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f0 LIKE '✅test' ESCAPE '\\'", false,
DataTypes.BOOLEAN())
+ .testSqlResult("f5 LIKE '✅test✅' ESCAPE '\\'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f5 LIKE '✅%✅' ESCAPE '\\'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f5 LIKE '✅%' ESCAPE '\\'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f5 LIKE '%st✅' ESCAPE '\\'", true,
DataTypes.BOOLEAN())
+ // Mixed escaped symbols
+ .testSqlResult("f2 LIKE 'te_st' ESCAPE '!'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f2 LIKE 'te__st' ESCAPE '_'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f1 LIKE 'test_%' ESCAPE '_'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f2 LIKE 'te%_st' ESCAPE '%'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f1 LIKE 'test%%' ESCAPE '%'", true,
DataTypes.BOOLEAN())
+ .testSqlValidationError(
+ "f2 LIKE 'te_st' ESCAPE '_'", "Invalid escape
sequence 'te_st', 2")
+ .testSqlValidationError(
+ "f1 LIKE 'test_' ESCAPE '_'", "Invalid escape
sequence 'test_', 4")
+ .testSqlValidationError(
+ "f2 LIKE 'te%st' ESCAPE '%'", "Invalid escape
sequence 'te%st', 2")
+ .testSqlValidationError(
+ "f1 LIKE 'test%' ESCAPE '%'", "Invalid escape
sequence 'test%', 4")
+ .testSqlValidationError(
+ "f0 LIKE 'test\\\"end' ESCAPE '\\'",
+ "Invalid escape sequence 'test\\\"end', 4")
+ .testSqlValidationError(
+ "f0 LIKE '%e_t%' ESCAPE 'ab'", "Invalid escape
character 'ab'")
+ // Mixed
+ .testSqlResult("f0 LIKE 'test\"end' ESCAPE '!'",
false, DataTypes.BOOLEAN())
+ .testSqlResult("f3 LIKE 'te\"st' ESCAPE '!'", true,
DataTypes.BOOLEAN())
+ .testSqlResult("f4 LIKE 'test\\' ESCAPE '!'", true,
DataTypes.BOOLEAN())
+ .testSqlResult(
+ "'a1bc' LIKE CAST('a%\"+1+\"b%c' AS STRING)
ESCAPE '!'",
+ false, DataTypes.BOOLEAN().notNull())
+ .testSqlResult(
+ "'a1\"+1+\"bc' LIKE CAST('a%\"+1+\"b%c' AS
STRING) ESCAPE '!'",
+ true, DataTypes.BOOLEAN().notNull())
+ // Unicode like sequence
+ .testSqlResult(
+ "f0 LIKE 'test" + "\\u" + "000Aend' ESCAPE
'!'",
+ false,
+ DataTypes.BOOLEAN())
+ .testSqlResult(
+ "'test\\u000Aend' LIKE 'test" + "\\u" +
"000Aend' ESCAPE '!'",
+ true,
+ DataTypes.BOOLEAN().notNull())
+ // Special characters
+ .testSqlResult(
+ "f0 LIKE '\btest\ne\\nd\f' ESCAPE '!'", false,
DataTypes.BOOLEAN())
+ .testSqlResult(
+ "'\btest\ne\\nd\f' LIKE '\btest\ne\\nd\f'
ESCAPE '!'",
+ true,
+ DataTypes.BOOLEAN().notNull()));
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlLikeChainChecker.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlLikeChainChecker.java
index 3be1666a1db..06068031bfc 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlLikeChainChecker.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlLikeChainChecker.java
@@ -45,11 +45,13 @@ public class SqlLikeChainChecker {
private final int[] midLens;
private final int beginLen;
private final int endLen;
+ private final boolean leftAnchor;
+ private final boolean rightAnchor;
public SqlLikeChainChecker(String pattern) {
final StringTokenizer tokens = new StringTokenizer(pattern, "%");
- final boolean leftAnchor = !pattern.startsWith("%");
- final boolean rightAnchor = !pattern.endsWith("%");
+ leftAnchor = !pattern.startsWith("%");
+ rightAnchor = !pattern.endsWith("%");
int len = 0;
// at least 2 checkers always
BinaryStringData leftPattern = null;
@@ -93,7 +95,15 @@ public class SqlLikeChainChecker {
MemorySegment[] segments = str.getSegments();
int pos = str.getOffset();
int mark = str.getSizeInBytes();
- if (str.getSizeInBytes() < minLen) {
+ // Returns false early if either:
+ // the input is too short to match the pattern, or
+ // the pattern is empty (or anchored with no literals) but the input
is not empty.
+ if (mark < minLen
+ || beginPattern == null
+ && endPattern == null
+ && middlePatterns.length == 0
+ && mark > 0
+ && (leftAnchor || rightAnchor)) {
return false;
}
// prefix, extend start