This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a5a7925415b [FLINK-39651][table] REGEXP plan-time validation and
hot-path log cleanup (#28292)
a5a7925415b is described below
commit a5a7925415bdf6f257b100cc27e9aeb19d096d04
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue Jun 2 17:15:47 2026 +0200
[FLINK-39651][table] REGEXP plan-time validation and hot-path log cleanup
(#28292)
* [FLINK-39651][table] Drop hot-path error log in REGEXP
SqlFunctionUtils.regExp caught any exception from pattern compilation with
LOG.error, producing one stack trace per record processed when an invalid regex
was supplied (for example a column-referenced or function-built pattern that
fails to compile).
The pattern is already looked up through REGEXP_PATTERN_CACHE, so the only
hot-path cost left was the logging. PatternSyntaxException is now caught
silently and returns false, preserving the prior runtime contract without
flooding the log. Other exception types are no longer swallowed.
Adds runtime IT cases for REGEXP in RegexpFunctionsITCase covering literal
valid/no-match, null input, column-ref invalid, and function-call regex paths.
* [FLINK-39651][table] Route REGEXP through BridgingSqlFunction
Migrates REGEXP to the modern BuiltInFunctionDefinition stack while keeping
the existing StringCallGen codegen path.
The function definition is renamed to name("REGEXP") and marked
runtimeProvided() so it is exposed to both SQL and Table API conversion through
CoreModule. A new arm in ExprCodeGenerator's BridgingSqlFunction block routes
the call to StringCallGen.generateRegExp. The legacy
FlinkSqlOperatorTable.REGEXP entry, the DirectConvertRule mapping, and the case
REGEXP arm in StringCallGen are removed.
Behavior is preserved: same arg types, same codegen call, same runtime
helper. Only the registration path changes. The Table API and Python string
representation now renders as REGEXP(...) to match the function name.
* [FLINK-39651][table] Validate REGEXP literal patterns at plan time
Adds RegexpInputTypeStrategy which validates the two STRING arguments and
delegates the literal-pattern check to the shared
StrategyUtils.validateLiteralPattern helper introduced for the regex function
family. Literal regex arguments are eagerly compiled during type inference and
surface failures as ValidationException; non-literal or null-literal arguments
are deferred to runtime.
REGEXP is wired through the new strategy via
SpecificInputTypeStrategies.REGEXP.
Tests: new RegexpInputTypeStrategyTest covers the four branches
(non-literal defers, valid literal compiles, null literal is deferred, invalid
literal fails). RegexpFunctionsITCase gains a plan-time validation TestSetSpec
covering both Table API and SQL paths.
---
.../pyflink/table/tests/test_expression.py | 2 +-
.../functions/BuiltInFunctionDefinitions.java | 8 +--
.../strategies/RegexpInputTypeStrategy.java | 83 ++++++++++++++++++++++
.../strategies/SpecificInputTypeStrategies.java | 3 +
.../strategies/RegexpInputTypeStrategyTest.java | 57 +++++++++++++++
.../expressions/converter/DirectConvertRule.java | 2 -
.../functions/sql/FlinkSqlOperatorTable.java | 9 ---
.../table/planner/codegen/ExprCodeGenerator.scala | 3 +
.../planner/codegen/calls/StringCallGen.scala | 2 -
.../planner/functions/RegexpFunctionsITCase.java | 47 ++++++++++++
.../table/runtime/functions/SqlFunctionUtils.java | 15 ++--
11 files changed, 207 insertions(+), 24 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_expression.py
b/flink-python/pyflink/table/tests/test_expression.py
index e3681e0a184..2b957e7aea6 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -193,7 +193,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
self.assertEqual("MAKE_VALID_UTF8(a)", str(expr1.make_valid_utf8))
# regexp functions
- self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
+ self.assertEqual("REGEXP(a, b)", str(expr1.regexp(expr2)))
self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2)))
self.assertEqual('REGEXP_EXTRACT(a, b)',
str(expr1.regexp_extract(expr2)))
self.assertEqual('REGEXP_EXTRACT(a, b, 3)',
str(expr1.regexp_extract(expr2, 3)))
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 0b9ea76b175..7025a51904f 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1714,13 +1714,11 @@ public final class BuiltInFunctionDefinitions {
public static final BuiltInFunctionDefinition REGEXP =
BuiltInFunctionDefinition.newBuilder()
- .name("regexp")
+ .name("REGEXP")
.kind(SCALAR)
- .inputTypeStrategy(
- sequence(
-
logical(LogicalTypeFamily.CHARACTER_STRING),
-
logical(LogicalTypeFamily.CHARACTER_STRING)))
+ .inputTypeStrategy(SpecificInputTypeStrategies.REGEXP)
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
+ .runtimeProvided()
.build();
public static final BuiltInFunctionDefinition REGEXP_REPLACE =
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategy.java
new file mode 100644
index 00000000000..b80a04bee6e
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategy.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.util.List;
+import java.util.Optional;
+
+import static
org.apache.flink.table.types.inference.InputTypeStrategies.logical;
+
+/**
+ * Input type strategy for {@link BuiltInFunctionDefinitions#REGEXP}.
Validates literal regex
+ * patterns at planning time.
+ */
+@Internal
+public class RegexpInputTypeStrategy implements InputTypeStrategy {
+
+ private static final int ARG_STR = 0;
+ private static final int ARG_REGEX = 1;
+
+ private static final ArgumentTypeStrategy STRING_ARG =
+ logical(LogicalTypeFamily.CHARACTER_STRING);
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return ConstantArgumentCount.of(2);
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ final CallContext callContext, final boolean throwOnFailure) {
+ if (STRING_ARG.inferArgumentType(callContext, ARG_STR,
throwOnFailure).isEmpty()) {
+ return Optional.empty();
+ }
+ if (STRING_ARG.inferArgumentType(callContext, ARG_REGEX,
throwOnFailure).isEmpty()) {
+ return Optional.empty();
+ }
+
+ final Optional<List<DataType>> patternError =
+ StrategyUtils.validateLiteralPattern(callContext, ARG_REGEX,
throwOnFailure);
+ if (patternError.isPresent()) {
+ return patternError;
+ }
+
+ return Optional.of(callContext.getArgumentDataTypes());
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(final FunctionDefinition
definition) {
+ return List.of(
+ Signature.of(
+ Argument.ofGroup("str",
LogicalTypeFamily.CHARACTER_STRING),
+ Argument.ofGroup("regex",
LogicalTypeFamily.CHARACTER_STRING)));
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index fc6cd13177f..dc7167a656d 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -194,6 +194,9 @@ public final class SpecificInputTypeStrategies {
/** Type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
*/
public static final InputTypeStrategy TO_TIMESTAMP_LTZ = new
ToTimestampLtzInputTypeStrategy();
+ /** Type strategy for {@link BuiltInFunctionDefinitions#REGEXP}. */
+ public static final InputTypeStrategy REGEXP = new
RegexpInputTypeStrategy();
+
/** Type strategy for {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT}. */
public static final InputTypeStrategy REGEXP_EXTRACT = new
RegexpExtractInputTypeStrategy();
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategyTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategyTest.java
new file mode 100644
index 00000000000..6610eee392d
--- /dev/null
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategyTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.REGEXP;
+
+/** Tests for {@link RegexpInputTypeStrategy}. */
+class RegexpInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+
+ @Override
+ protected Stream<TestSpec> testData() {
+ return Stream.of(
+ // Non-literal regex skips the plan-time compile check and is
deferred to runtime.
+ TestSpec.forStrategy("Non-literal regex defers compile to
runtime", REGEXP)
+ .calledWithArgumentTypes(DataTypes.STRING(),
DataTypes.STRING())
+ .expectArgumentTypes(DataTypes.STRING(),
DataTypes.STRING()),
+
+ // Valid literal regex compiles cleanly at plan time.
+ TestSpec.forStrategy("Valid literal regex compiles", REGEXP)
+ .calledWithArgumentTypes(DataTypes.STRING(),
DataTypes.STRING())
+ .calledWithLiteralAt(1, "foo(.*?)bar")
+ .expectArgumentTypes(DataTypes.STRING(),
DataTypes.STRING()),
+
+ // Null literal regex short-circuits the plan-time check;
runtime returns null.
+ TestSpec.forStrategy("Null regex literal is deferred", REGEXP)
+ .calledWithArgumentTypes(DataTypes.STRING(),
DataTypes.STRING())
+ .calledWithLiteralAt(1, null)
+ .expectArgumentTypes(DataTypes.STRING(),
DataTypes.STRING()),
+
+ // Invalid literal regex surfaces as a ValidationException at
plan time.
+ TestSpec.forStrategy("Invalid literal regex fails at plan
time", REGEXP)
+ .calledWithArgumentTypes(DataTypes.STRING(),
DataTypes.STRING())
+ .calledWithLiteralAt(1, "(")
+ .expectErrorMessage("Invalid regular expression for"));
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index 17dffb8613f..11b9545499a 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -195,8 +195,6 @@ public class DirectConvertRule implements
CallExpressionConvertRule {
BuiltInFunctionDefinitions.RTRIM, FlinkSqlOperatorTable.RTRIM);
definitionSqlOperatorHashMap.put(
BuiltInFunctionDefinitions.REPEAT,
FlinkSqlOperatorTable.REPEAT);
- definitionSqlOperatorHashMap.put(
- BuiltInFunctionDefinitions.REGEXP,
FlinkSqlOperatorTable.REGEXP);
definitionSqlOperatorHashMap.put(
BuiltInFunctionDefinitions.REVERSE,
FlinkSqlOperatorTable.REVERSE);
definitionSqlOperatorHashMap.put(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index f44f1806df3..7bcfd1e266e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -597,15 +597,6 @@ public class FlinkSqlOperatorTable extends
ReflectiveSqlOperatorTable {
OperandTypes.family(SqlTypeFamily.CHARACTER,
SqlTypeFamily.CHARACTER)),
SqlFunctionCategory.TIMEDATE);
- public static final SqlFunction REGEXP =
- new SqlFunction(
- "REGEXP",
- SqlKind.OTHER_FUNCTION,
- ReturnTypes.BOOLEAN_NULLABLE,
- null,
- OperandTypes.family(SqlTypeFamily.CHARACTER,
SqlTypeFamily.CHARACTER),
- SqlFunctionCategory.STRING);
-
public static final SqlFunction PARSE_URL =
new SqlFunction(
"PARSE_URL",
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index f07bf80bc05..b05eac28261 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -952,6 +952,9 @@ class ExprCodeGenerator(
case BuiltInFunctionDefinitions.JSON =>
new JsonCallGen().generate(ctx, operands,
FlinkTypeFactory.toLogicalType(call.getType))
+ case BuiltInFunctionDefinitions.REGEXP =>
+ StringCallGen.generateRegExp(ctx, operands, resultType)
+
case BuiltInFunctionDefinitions.REGEXP_EXTRACT =>
StringCallGen.generateRegexpExtract(ctx, operands, resultType)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 72b3ce2e036..33e335f0549 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -134,8 +134,6 @@ object StringCallGen {
case CHR => generateChr(ctx, operands, returnType)
- case REGEXP => generateRegExp(ctx, operands, returnType)
-
case BIN => generateBin(ctx, operands, returnType)
case CONCAT_FUNCTION =>
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
index e62010bd0c1..5d920617512 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
@@ -33,6 +33,7 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase {
@Override
Stream<TestSetSpec> getTestSetSpecs() {
return Stream.of(
+ regexpTestCases(),
regexpCountTestCases(),
regexpExtractTestCases(),
regexpExtractAllTestCases(),
@@ -42,6 +43,52 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase {
.flatMap(s -> s);
}
+ private Stream<TestSetSpec> regexpTestCases() {
+ return Stream.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP)
+ .onFieldsWithData(null, "foobar", "(")
+ .andDataTypes(DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING())
+ .testResult(
+ $("f0").regexp("foo"),
+ "REGEXP(f0, 'foo')",
+ null,
+ DataTypes.BOOLEAN().nullable())
+ .testResult(
+ $("f1").regexp("foo"),
+ "REGEXP(f1, 'foo')",
+ true,
+ DataTypes.BOOLEAN().nullable())
+ .testResult(
+ $("f1").regexp("xyz"),
+ "REGEXP(f1, 'xyz')",
+ false,
+ DataTypes.BOOLEAN().nullable())
+ .testResult(
+ $("f1").regexp($("f2")),
+ "REGEXP(f1, f2)",
+ false,
+ DataTypes.BOOLEAN().nullable())
+ .testResult(
+ $("f1").regexp(concat("fo", "o")),
+ "REGEXP(f1, 'fo' || 'o')",
+ true,
+ DataTypes.BOOLEAN().nullable())
+ .testResult(
+ $("f1").regexp(concat("(", "")),
+ "REGEXP(f1, '(' || '')",
+ false,
+ DataTypes.BOOLEAN().nullable()),
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.REGEXP,
+ "Invalid literal regex fails at plan time")
+ .onFieldsWithData("foobar")
+ .andDataTypes(DataTypes.STRING())
+ .testTableApiValidationError(
+ $("f0").regexp("("), "Invalid regular
expression for REGEXP:")
+ .testSqlValidationError(
+ "REGEXP(f0, '(')", "Invalid regular expression
for REGEXP:"));
+ }
+
private Stream<TestSetSpec> regexpCountTestCases() {
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_COUNT)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index 1932a6db862..54c45f0a4f1 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -1015,14 +1015,19 @@ public class SqlFunctionUtils {
return Math.abs(str.hashCode());
}
- public static Boolean regExp(String s, String regex) {
- if (regex.length() == 0) {
+ /**
+ * Returns whether {@code s} contains a match for the regular expression
{@code regex}. Literal
+ * regexes are validated at planning time by the input type strategy.
+ */
+ public static boolean regExp(String s, String regex) {
+ if (regex.isEmpty()) {
return false;
}
try {
- return (REGEXP_PATTERN_CACHE.get(regex)).matcher(s).find(0);
- } catch (Exception e) {
- LOG.error("Exception when compile and match regex:" + regex + "
on: " + s, e);
+ return REGEXP_PATTERN_CACHE.get(regex).matcher(s).find(0);
+ } catch (PatternSyntaxException e) {
+ // Literals are rejected at planning time; non-literal invalid
regex
+ // returns false to preserve the prior runtime contract.
return false;
}
}