This is an automated email from the ASF dual-hosted git repository. AHeise pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3c7a45a61b3394790cab531205d576976ae50da7 Author: Ramin Gharib <[email protected]> AuthorDate: Mon May 18 15:00:38 2026 +0200 [FLINK-39650][table] Validate REGEXP_REPLACE literal patterns at plan time Adds StrategyUtils.validateLiteralPattern as a shared helper for the regex function family. It eagerly compiles literal regex arguments during type inference and surfaces failures as ValidationException via callContext.fail. Non-literal or null-literal arguments are deferred to runtime. Wires REGEXP_REPLACE through the new RegexpReplaceInputTypeStrategy which validates 3 STRING arguments and delegates the literal-pattern check to the shared helper. REGEXP_EXTRACT is refactored to use the same helper. Tests: new RegexpReplaceInputTypeStrategyTest covers the four branches (non-literal defers, valid literal compiles, null literal is deferred, invalid literal fails). RegexpFunctionsITCase gains a plan-time validation TestSetSpec for REGEXP_REPLACE covering both Table API and SQL paths. --- .../functions/BuiltInFunctionDefinitions.java | 6 +-- .../strategies/RegexpExtractInputTypeStrategy.java | 24 +----------- ...gy.java => RegexpReplaceInputTypeStrategy.java} | 43 ++++------------------ .../strategies/SpecificInputTypeStrategies.java | 3 ++ .../types/inference/strategies/StrategyUtils.java | 32 ++++++++++++++++ .../RegexpExtractInputTypeStrategyTest.java | 2 +- ...ava => RegexpReplaceInputTypeStrategyTest.java} | 38 ++++++++++--------- .../planner/functions/RegexpFunctionsITCase.java | 13 ++++++- 8 files changed, 78 insertions(+), 83 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index cbf39763cdb..7a25ae6058a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1680,11 +1680,7 @@ public final class BuiltInFunctionDefinitions { BuiltInFunctionDefinition.newBuilder() .name("REGEXP_REPLACE") .kind(SCALAR) - .inputTypeStrategy( - sequence( - logical(LogicalTypeFamily.CHARACTER_STRING), - logical(LogicalTypeFamily.CHARACTER_STRING), - logical(LogicalTypeFamily.CHARACTER_STRING))) + .inputTypeStrategy(SpecificInputTypeStrategies.REGEXP_REPLACE) .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING()))) .runtimeProvided() .build(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java index e95620105e7..a6330757487 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java @@ -34,8 +34,6 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot; import java.util.List; import java.util.Optional; -import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; import static org.apache.flink.table.types.inference.InputTypeStrategies.logical; @@ -75,7 +73,7 @@ public class RegexpExtractInputTypeStrategy implements InputTypeStrategy { } final Optional<List<DataType>> patternError = - validateLiteralPattern(callContext, throwOnFailure); + StrategyUtils.validateLiteralPattern(callContext, ARG_REGEX, throwOnFailure); if (patternError.isPresent()) { return patternError; } @@ -83,26 +81,6 @@ public class RegexpExtractInputTypeStrategy implements InputTypeStrategy { return Optional.of(callContext.getArgumentDataTypes()); } - private static Optional<List<DataType>> validateLiteralPattern( - final CallContext callContext, final boolean throwOnFailure) { - if (!callContext.isArgumentLiteral(ARG_REGEX) || callContext.isArgumentNull(ARG_REGEX)) { - return Optional.empty(); - } - final Optional<String> pattern = callContext.getArgumentValue(ARG_REGEX, String.class); - if (pattern.isEmpty()) { - return Optional.empty(); - } - try { - Pattern.compile(pattern.get()); - return Optional.empty(); - } catch (PatternSyntaxException e) { - return callContext.fail( - throwOnFailure, - "Invalid regular expression for REGEXP_EXTRACT: %s", - e.getMessage()); - } - } - @Override public List<Signature> getExpectedSignatures(FunctionDefinition definition) { return List.of( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategy.java similarity index 63% copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategy.java index e95620105e7..3e31fb4a228 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategy.java @@ -30,33 +30,29 @@ import org.apache.flink.table.types.inference.InputTypeStrategy; import org.apache.flink.table.types.inference.Signature; import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.logical.LogicalTypeFamily; -import org.apache.flink.table.types.logical.LogicalTypeRoot; import java.util.List; import java.util.Optional; -import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; import static org.apache.flink.table.types.inference.InputTypeStrategies.logical; /** - * Input type strategy for {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT}. Validates literal + * Input type strategy for {@link BuiltInFunctionDefinitions#REGEXP_REPLACE}. Validates literal * regex patterns at planning time. */ @Internal -public class RegexpExtractInputTypeStrategy implements InputTypeStrategy { +public class RegexpReplaceInputTypeStrategy implements InputTypeStrategy { private static final int ARG_STR = 0; private static final int ARG_REGEX = 1; - private static final int ARG_EXTRACT_INDEX = 2; + private static final int ARG_REPLACEMENT = 2; private static final ArgumentTypeStrategy STRING_ARG = logical(LogicalTypeFamily.CHARACTER_STRING); - private static final ArgumentTypeStrategy INT_ARG = logical(LogicalTypeRoot.INTEGER); @Override public ArgumentCount getArgumentCount() { - return ConstantArgumentCount.between(2, 3); + return ConstantArgumentCount.of(3); } @Override @@ -68,14 +64,12 @@ public class RegexpExtractInputTypeStrategy implements InputTypeStrategy { if (STRING_ARG.inferArgumentType(callContext, ARG_REGEX, throwOnFailure).isEmpty()) { return Optional.empty(); } - if (callContext.getArgumentDataTypes().size() > ARG_EXTRACT_INDEX - && INT_ARG.inferArgumentType(callContext, ARG_EXTRACT_INDEX, throwOnFailure) - .isEmpty()) { + if (STRING_ARG.inferArgumentType(callContext, ARG_REPLACEMENT, throwOnFailure).isEmpty()) { return Optional.empty(); } final Optional<List<DataType>> patternError = - validateLiteralPattern(callContext, throwOnFailure); + StrategyUtils.validateLiteralPattern(callContext, ARG_REGEX, throwOnFailure); if (patternError.isPresent()) { return patternError; } @@ -83,35 +77,12 @@ public class RegexpExtractInputTypeStrategy implements InputTypeStrategy { return Optional.of(callContext.getArgumentDataTypes()); } - private static Optional<List<DataType>> validateLiteralPattern( - final CallContext callContext, final boolean throwOnFailure) { - if (!callContext.isArgumentLiteral(ARG_REGEX) || callContext.isArgumentNull(ARG_REGEX)) { - return Optional.empty(); - } - final Optional<String> pattern = callContext.getArgumentValue(ARG_REGEX, String.class); - if (pattern.isEmpty()) { - return Optional.empty(); - } - try { - Pattern.compile(pattern.get()); - return Optional.empty(); - } catch (PatternSyntaxException e) { - return callContext.fail( - throwOnFailure, - "Invalid regular expression for REGEXP_EXTRACT: %s", - e.getMessage()); - } - } - @Override public List<Signature> getExpectedSignatures(FunctionDefinition definition) { return List.of( - Signature.of( - Argument.ofGroup("str", LogicalTypeFamily.CHARACTER_STRING), - Argument.ofGroup("regex", LogicalTypeFamily.CHARACTER_STRING)), Signature.of( Argument.ofGroup("str", LogicalTypeFamily.CHARACTER_STRING), Argument.ofGroup("regex", LogicalTypeFamily.CHARACTER_STRING), - Argument.ofGroup("extractIndex", LogicalTypeRoot.INTEGER))); + Argument.ofGroup("replacement", LogicalTypeFamily.CHARACTER_STRING))); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java index 81d689a550c..fc6cd13177f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java @@ -197,6 +197,9 @@ public final class SpecificInputTypeStrategies { /** Type strategy for {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT}. */ public static final InputTypeStrategy REGEXP_EXTRACT = new RegexpExtractInputTypeStrategy(); + /** Type strategy for {@link BuiltInFunctionDefinitions#REGEXP_REPLACE}. */ + public static final InputTypeStrategy REGEXP_REPLACE = new RegexpReplaceInputTypeStrategy(); + private SpecificInputTypeStrategies() { // no instantiation } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java index 3e79975de22..bc387ccae81 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java @@ -37,8 +37,11 @@ import org.apache.flink.table.types.utils.TypeConversions; import javax.annotation.Nullable; +import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; import static org.apache.flink.table.types.logical.LogicalTypeFamily.APPROXIMATE_NUMERIC; import static org.apache.flink.table.types.logical.LogicalTypeFamily.EXACT_NUMERIC; @@ -219,6 +222,35 @@ final class StrategyUtils { return Optional.empty(); } + /** + * Validates that the argument at {@code regexArgPos} is a literal that compiles as a Java + * regex. Non-literal or null-literal regex arguments are deferred to runtime. + * + * @return {@link Optional#empty()} when the argument is not a literal, is a {@code NULL} + * literal, or compiles cleanly; the result of {@link CallContext#fail} otherwise. + */ + static Optional<List<DataType>> validateLiteralPattern( + final CallContext callContext, final int regexArgPos, final boolean throwOnFailure) { + if (!callContext.isArgumentLiteral(regexArgPos) + || callContext.isArgumentNull(regexArgPos)) { + return Optional.empty(); + } + final Optional<String> pattern = callContext.getArgumentValue(regexArgPos, String.class); + if (pattern.isEmpty()) { + return Optional.empty(); + } + try { + Pattern.compile(pattern.get()); + return Optional.empty(); + } catch (PatternSyntaxException e) { + return callContext.fail( + throwOnFailure, + "Invalid regular expression for %s: %s", + callContext.getName(), + e.getMessage()); + } + } + private StrategyUtils() { // no instantiation } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java index 8e92023f881..7e514eeb390 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java @@ -55,6 +55,6 @@ class RegexpExtractInputTypeStrategyTest extends InputTypeStrategiesTestBase { TestSpec.forStrategy("Invalid literal regex fails at plan time", REGEXP_EXTRACT) .calledWithArgumentTypes(DataTypes.STRING(), DataTypes.STRING()) .calledWithLiteralAt(1, "(") - .expectErrorMessage("Invalid regular expression for REGEXP_EXTRACT:")); + .expectErrorMessage("Invalid regular expression for")); } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategyTest.java similarity index 67% copy from flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java copy to flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategyTest.java index 8e92023f881..6b8c219f96b 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategyTest.java @@ -23,38 +23,42 @@ import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase; import java.util.stream.Stream; -import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.REGEXP_EXTRACT; +import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.REGEXP_REPLACE; -/** Tests for {@link RegexpExtractInputTypeStrategy}. */ -class RegexpExtractInputTypeStrategyTest extends InputTypeStrategiesTestBase { +/** Tests for {@link RegexpReplaceInputTypeStrategy}. */ +class RegexpReplaceInputTypeStrategyTest extends InputTypeStrategiesTestBase { @Override protected Stream<TestSpec> testData() { return Stream.of( // Non-literal regex skips the plan-time compile check and is deferred to runtime. - TestSpec.forStrategy("Non-literal regex defers compile to runtime", REGEXP_EXTRACT) - .calledWithArgumentTypes(DataTypes.STRING(), DataTypes.STRING()) - .expectArgumentTypes(DataTypes.STRING(), DataTypes.STRING()), + TestSpec.forStrategy("Non-literal regex defers compile to runtime", REGEXP_REPLACE) + .calledWithArgumentTypes( + DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()) + .expectArgumentTypes( + DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()), // Valid literal regex compiles cleanly at plan time. - TestSpec.forStrategy("Valid literal regex compiles", REGEXP_EXTRACT) + TestSpec.forStrategy("Valid literal regex compiles", REGEXP_REPLACE) .calledWithArgumentTypes( - DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT()) + DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()) .calledWithLiteralAt(1, "foo(.*?)bar") .expectArgumentTypes( - DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT()), + DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()), // Null literal regex short-circuits the plan-time check; runtime returns null. - TestSpec.forStrategy("Null regex literal is deferred", REGEXP_EXTRACT) - .calledWithArgumentTypes(DataTypes.STRING(), DataTypes.STRING()) + TestSpec.forStrategy("Null regex literal is deferred", REGEXP_REPLACE) + .calledWithArgumentTypes( + DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()) .calledWithLiteralAt(1, null) - .expectArgumentTypes(DataTypes.STRING(), DataTypes.STRING()), + .expectArgumentTypes( + DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()), - // Invalid literal regex surfaces as a ValidationException at plan time - // instead of producing one log line per record at runtime. - TestSpec.forStrategy("Invalid literal regex fails at plan time", REGEXP_EXTRACT) - .calledWithArgumentTypes(DataTypes.STRING(), DataTypes.STRING()) + // Invalid literal regex surfaces as a ValidationException at plan time. + TestSpec.forStrategy("Invalid literal regex fails at plan time", REGEXP_REPLACE) + .calledWithArgumentTypes( + DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()) .calledWithLiteralAt(1, "(") - .expectErrorMessage("Invalid regular expression for REGEXP_EXTRACT:")); + .expectErrorMessage("Invalid regular expression for")); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java index 85c5ef49d61..e62010bd0c1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java @@ -375,7 +375,18 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase { $("f1").regexpReplace(concat("(", ""), "X"), "REGEXP_REPLACE(f1, '(' || '', 'X')", null, - DataTypes.STRING().nullable())); + DataTypes.STRING().nullable()), + TestSetSpec.forFunction( + BuiltInFunctionDefinitions.REGEXP_REPLACE, + "Invalid literal regex fails at plan time") + .onFieldsWithData("foobar") + .andDataTypes(DataTypes.STRING()) + .testTableApiValidationError( + $("f0").regexpReplace("(", "X"), + "Invalid regular expression for REGEXP_REPLACE:") + .testSqlValidationError( + "REGEXP_REPLACE(f0, '(', 'X')", + "Invalid regular expression for REGEXP_REPLACE:")); } private Stream<TestSetSpec> regexpSubstrTestCases() {
