This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ff7a0f6f25748933729bc94038444b3d4e233315 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Mon Mar 21 14:39:56 2022 +0100 [FLINK-13785][table] Port time functions to new type inference This closes #19190. --- .../java/functions/InternalRowMergerFunction.java | 11 +- .../functions/BuiltInFunctionDefinitions.java | 66 +++++++--- .../flink/table/types/inference/CallContext.java | 16 ++- .../table/types/inference/InputTypeStrategies.java | 27 +++- .../flink/table/types/inference/Signature.java | 73 ++++++++++- .../table/types/inference/TypeStrategies.java | 9 +- .../strategies/AnyArgumentTypeStrategy.java | 7 +- ...ategy.java => ArgumentMappingTypeStrategy.java} | 12 +- .../strategies/CastInputTypeStrategy.java | 11 +- .../strategies/CommonArgumentTypeStrategy.java | 7 +- .../strategies/CommonInputTypeStrategy.java | 16 +-- .../strategies/ComparableTypeStrategy.java | 28 ++-- .../strategies/CompositeArgumentTypeStrategy.java | 13 +- .../strategies/ConstraintArgumentTypeStrategy.java | 8 +- .../CurrentWatermarkInputTypeStrategy.java | 22 ++-- .../strategies/ExplicitArgumentTypeStrategy.java | 13 +- .../strategies/ExtractInputTypeStrategy.java | 108 +++++++++++++++ .../strategies/FamilyArgumentTypeStrategy.java | 23 ++-- .../strategies/LiteralArgumentTypeStrategy.java | 19 +-- .../strategies/OutputArgumentTypeStrategy.java | 7 +- .../strategies/RootArgumentTypeStrategy.java | 17 ++- .../strategies/SpecificInputTypeStrategies.java | 7 + .../strategies/SpecificTypeStrategies.java | 3 + .../strategies/SubsequenceInputTypeStrategy.java | 11 +- .../strategies/SymbolArgumentTypeStrategy.java | 59 +++++---- .../TemporalOverlapsInputTypeStrategy.java | 120 +++++++++++++++++ ...rategy.java => ToTimestampLtzTypeStrategy.java} | 22 +--- .../TypeLiteralArgumentTypeStrategy.java | 2 +- .../VaryingSequenceInputTypeStrategy.java | 4 +- .../flink/table/types/logical/LogicalType.java | 10 ++ ...unctionITCase.java => TimeFunctionsITCase.java} | 145 +++++++++++++++++++-- .../planner/expressions/SqlExpressionTest.scala | 3 - .../planner/expressions/TemporalTypesTest.scala | 8 +- .../expressions/utils/ExpressionTestBase.scala | 29 ++--- 34 files changed, 710 insertions(+), 226 deletions(-) diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java index f0cfe4fc993..389c40fd0c5 100644 --- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java +++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java @@ -80,11 +80,8 @@ public final class InternalRowMergerFunction extends ScalarFunction { if (arg0.getLogicalType().getTypeRoot() != LogicalTypeRoot.ROW || arg1.getLogicalType().getTypeRoot() != LogicalTypeRoot.ROW) { - if (throwOnFailure) { - throw callContext.newValidationError( - "Two row arguments expected."); - } - return Optional.empty(); + return callContext.fail( + throwOnFailure, "Two row arguments expected."); } // keep the original logical type but express that both arguments // should use internal data structures @@ -99,7 +96,9 @@ public final class InternalRowMergerFunction extends ScalarFunction { FunctionDefinition definition) { // this helps in printing nice error messages return Collections.singletonList( - Signature.of(Argument.of("ROW"), Argument.of("ROW"))); + Signature.of( + Argument.ofGroup(LogicalTypeRoot.ROW), + Argument.ofGroup(LogicalTypeRoot.ROW))); } }) .outputTypeStrategy( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 725d823a1bf..fe0d40ef098 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -28,6 +28,7 @@ import org.apache.flink.table.api.JsonQueryWrapper; import org.apache.flink.table.api.JsonType; import org.apache.flink.table.api.JsonValueOnEmptyOrError; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.TimePointUnit; import org.apache.flink.table.types.inference.ArgumentTypeStrategy; import org.apache.flink.table.types.inference.ConstantArgumentCount; import org.apache.flink.table.types.inference.InputTypeStrategies; @@ -48,6 +49,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.BOOLEAN; +import static org.apache.flink.table.api.DataTypes.DATE; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TIME; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ; import static org.apache.flink.table.functions.FunctionKind.AGGREGATE; import static org.apache.flink.table.functions.FunctionKind.OTHER; import static org.apache.flink.table.functions.FunctionKind.SCALAR; @@ -365,7 +374,7 @@ public final class BuiltInFunctionDefinitions { .name("count") .kind(AGGREGATE) .inputTypeStrategy(sequence(ANY)) // COUNT(*) is not supported yet - .outputTypeStrategy(explicit(DataTypes.BIGINT().notNull())) + .outputTypeStrategy(explicit(BIGINT().notNull())) .build(); public static final BuiltInFunctionDefinition MAX = @@ -463,7 +472,7 @@ public final class BuiltInFunctionDefinitions { .name("charLength") .kind(SCALAR) .inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING))) - .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.INT()))) + .outputTypeStrategy(nullableIfArgs(explicit(INT()))) .build(); public static final BuiltInFunctionDefinition INIT_CAP = @@ -581,7 +590,7 @@ public final class BuiltInFunctionDefinitions { sequence( logical(LogicalTypeFamily.CHARACTER_STRING), logical(LogicalTypeFamily.CHARACTER_STRING))) - .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.INT()))) + .outputTypeStrategy(nullableIfArgs(explicit(INT()))) .build(); public static final BuiltInFunctionDefinition OVERLAY = @@ -1179,7 +1188,7 @@ public final class BuiltInFunctionDefinitions { sequence( logical(LogicalTypeRoot.INTEGER), logical(LogicalTypeRoot.INTEGER)))) - .outputTypeStrategy(explicit(DataTypes.INT().notNull())) + .outputTypeStrategy(explicit(INT().notNull())) .build(); public static final BuiltInFunctionDefinition BIN = @@ -1222,76 +1231,103 @@ public final class BuiltInFunctionDefinitions { BuiltInFunctionDefinition.newBuilder() .name("extract") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy(SpecificInputTypeStrategies.EXTRACT) + .outputTypeStrategy(nullableIfArgs(explicit(BIGINT()))) .build(); public static final BuiltInFunctionDefinition CURRENT_DATE = BuiltInFunctionDefinition.newBuilder() .name("currentDate") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(DATE().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_TIME = BuiltInFunctionDefinition.newBuilder() .name("currentTime") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIME().notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("currentTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull())) .build(); public static final BuiltInFunctionDefinition CURRENT_ROW_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("currentRowTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull())) .build(); public static final BuiltInFunctionDefinition LOCAL_TIME = BuiltInFunctionDefinition.newBuilder() .name("localTime") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIME().notNull())) .build(); public static final BuiltInFunctionDefinition LOCAL_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() .name("localTimestamp") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(explicit(TIMESTAMP(3).notNull())) .build(); public static final BuiltInFunctionDefinition TEMPORAL_OVERLAPS = BuiltInFunctionDefinition.newBuilder() .name("temporalOverlaps") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy(SpecificInputTypeStrategies.TEMPORAL_OVERLAPS) + .outputTypeStrategy(nullableIfArgs(explicit(BOOLEAN()))) .build(); public static final BuiltInFunctionDefinition DATE_FORMAT = BuiltInFunctionDefinition.newBuilder() .name("dateFormat") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + or( + sequence( + logical(LogicalTypeFamily.TIMESTAMP), + logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)))) + .outputTypeStrategy(nullableIfArgs(explicit(STRING()))) .build(); public static final BuiltInFunctionDefinition TIMESTAMP_DIFF = BuiltInFunctionDefinition.newBuilder() .name("timestampDiff") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + sequence( + symbol( + TimePointUnit.YEAR, + TimePointUnit.QUARTER, + TimePointUnit.MONTH, + TimePointUnit.WEEK, + TimePointUnit.DAY, + TimePointUnit.HOUR, + TimePointUnit.MINUTE, + TimePointUnit.SECOND), + logical(LogicalTypeFamily.DATETIME), + logical(LogicalTypeFamily.DATETIME))) + .outputTypeStrategy(nullableIfArgs(explicit(INT()))) .build(); public static final BuiltInFunctionDefinition TO_TIMESTAMP_LTZ = BuiltInFunctionDefinition.newBuilder() .name("toTimestampLtz") .kind(SCALAR) - .outputTypeStrategy(TypeStrategies.MISSING) + .inputTypeStrategy( + sequence( + logical(LogicalTypeFamily.NUMERIC), + logical(LogicalTypeFamily.INTEGER_NUMERIC, false))) + .outputTypeStrategy(SpecificTypeStrategies.TO_TIMESTAMP_LTZ) .build(); // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java index 13cfc11e2ca..62fdd9f4d8a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java @@ -86,13 +86,27 @@ public interface CallContext { Optional<DataType> getOutputDataType(); /** - * Creates a validation error for exiting the type inference process with a meaningful + * Creates a validation exception for exiting the type inference process with a meaningful * exception. */ default ValidationException newValidationError(String message, Object... args) { return new ValidationException(String.format(message, args)); } + /** + * Helper method for handling failures during the type inference process while considering the + * {@code throwOnFailure} flag. + * + * <p>Shorthand for {@code if (throwOnFailure) throw ValidationException(...) else return + * Optional.empty()}. + */ + default <T> Optional<T> fail(boolean throwOnFailure, String message, Object... args) { + if (throwOnFailure) { + throw newValidationError(message, args); + } + return Optional.empty(); + } + /** * Returns whether the function call happens as part of an aggregation that defines grouping * columns. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java index 2f3feea6259..fe7157453e0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java @@ -50,6 +50,7 @@ import java.util.Arrays; import java.util.List; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Strategies for inferring and validating input arguments in a function call. @@ -304,10 +305,30 @@ public final class InputTypeStrategies { return new OrArgumentTypeStrategy(Arrays.asList(strategies)); } - /** Strategy for a symbol argument of a specific {@link TableSymbol} enum. */ - public static SymbolArgumentTypeStrategy symbol( + /** + * Strategy for a symbol argument of a specific {@link TableSymbol} enum. + * + * <p>A symbol is implied to be a literal argument. + */ + public static SymbolArgumentTypeStrategy<?> symbol( Class<? extends Enum<? extends TableSymbol>> clazz) { - return new SymbolArgumentTypeStrategy(clazz); + return new SymbolArgumentTypeStrategy<>(clazz); + } + + /** + * Strategy for a symbol argument of a specific {@link TableSymbol} enum, with value being one + * of the provided variants. + * + * <p>A symbol is implied to be a literal argument. + */ + @SafeVarargs + @SuppressWarnings("unchecked") + public static <T extends Enum<? extends TableSymbol>> SymbolArgumentTypeStrategy<T> symbol( + T firstAllowedVariant, T... otherAllowedVariants) { + return new SymbolArgumentTypeStrategy<T>( + (Class<T>) firstAllowedVariant.getClass(), + Stream.concat(Stream.of(firstAllowedVariant), Arrays.stream(otherAllowedVariants)) + .collect(Collectors.toSet())); } /** diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/Signature.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/Signature.java index e845ca8f217..0d9495a365c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/Signature.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/Signature.java @@ -19,7 +19,11 @@ package org.apache.flink.table.types.inference; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.TableSymbol; import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -61,8 +65,12 @@ public final class Signature { /** * Representation of a single argument in a signature. * - * <p>The type is represented as {@link String} in order to also express type families or - * varargs. + * <p>The argument is represented as a {@link String} in order to express both explicit types + * (see {@code of(...)}) or groups/families of types (see {@code ofKind(...)}). + * + * <p>The general string formatting convention is to use {@code T} for explicit types, {@code + * <T>} for groups/families of types, {@code T...} for varargs, and {@code [T]} for + * conditions. */ @PublicEvolving public static final class Argument { @@ -76,16 +84,71 @@ public final class Signature { this.type = Preconditions.checkNotNull(type); } - /** Returns an instance of {@link Argument}. */ public static Argument of(String name, String type) { - return new Argument(Preconditions.checkNotNull(name, "Name must not be null."), type); + return new Argument(name, type); + } + + public static Argument of(String name, LogicalType type) { + return of(name, type.asSummaryString()); + } + + public static Argument ofVarying(String name, String type) { + return new Argument(name, type + "..."); } - /** Returns an instance of {@link Argument}. */ public static Argument of(String type) { return new Argument(null, type); } + public static Argument of(LogicalType type) { + return of(type.asSummaryString()); + } + + public static Argument ofVarying(String type) { + return new Argument(null, type + "..."); + } + + public static Argument ofGroup(String name, String typeGroup) { + return new Argument(name, "<" + typeGroup + ">"); + } + + public static Argument ofGroup(String name, LogicalTypeRoot typeRoot) { + return ofGroup(name, typeRoot.name()); + } + + public static Argument ofGroup(String name, LogicalTypeFamily typeFamily) { + return ofGroup(name, typeFamily.name()); + } + + public static Argument ofGroup( + String name, Class<? extends Enum<? extends TableSymbol>> symbol) { + return ofGroup(name, symbol.getSimpleName()); + } + + public static Argument ofGroupVarying(String name, String typeGroup) { + return new Argument(name, "<" + typeGroup + ">..."); + } + + public static Argument ofGroup(String typeGroup) { + return ofGroup(null, typeGroup); + } + + public static Argument ofGroup(LogicalTypeRoot typeRoot) { + return ofGroup(typeRoot.name()); + } + + public static Argument ofGroup(LogicalTypeFamily typeFamily) { + return ofGroup(typeFamily.name()); + } + + public static Argument ofGroup(Class<? extends Enum<? extends TableSymbol>> symbol) { + return ofGroup(symbol.getSimpleName()); + } + + public static Argument ofGroupVarying(String typeGroup) { + return new Argument(null, "<" + typeGroup + ">..."); + } + public Optional<String> getName() { return Optional.ofNullable(name); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java index 421fd71b405..a518ba03b6a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java @@ -20,6 +20,7 @@ package org.apache.flink.table.types.inference; import org.apache.flink.annotation.Internal; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.strategies.ArgumentMappingTypeStrategy; import org.apache.flink.table.types.inference.strategies.CommonTypeStrategy; import org.apache.flink.table.types.inference.strategies.ExplicitTypeStrategy; import org.apache.flink.table.types.inference.strategies.FirstTypeStrategy; @@ -28,7 +29,6 @@ import org.apache.flink.table.types.inference.strategies.MappingTypeStrategy; import org.apache.flink.table.types.inference.strategies.MatchFamilyTypeStrategy; import org.apache.flink.table.types.inference.strategies.MissingTypeStrategy; import org.apache.flink.table.types.inference.strategies.NullableIfArgsTypeStrategy; -import org.apache.flink.table.types.inference.strategies.UseArgumentTypeStrategy; import org.apache.flink.table.types.inference.strategies.VaryingStringTypeStrategy; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; @@ -62,7 +62,12 @@ public final class TypeStrategies { /** Type strategy that returns the n-th input argument. */ public static TypeStrategy argument(int pos) { - return new UseArgumentTypeStrategy(pos); + return new ArgumentMappingTypeStrategy(pos, Optional::of); + } + + /** Type strategy that returns the n-th input argument, mapping it. */ + public static TypeStrategy argument(int pos, Function<DataType, Optional<DataType>> mapper) { + return new ArgumentMappingTypeStrategy(pos, mapper); } /** Type strategy that returns the first type that could be inferred. */ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/AnyArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/AnyArgumentTypeStrategy.java index 3163e453b21..4dbb3d7a112 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/AnyArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/AnyArgumentTypeStrategy.java @@ -22,7 +22,7 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.ArgumentTypeStrategy; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; import java.util.Optional; @@ -37,9 +37,8 @@ public final class AnyArgumentTypeStrategy implements ArgumentTypeStrategy { } @Override - public Signature.Argument getExpectedArgument( - FunctionDefinition functionDefinition, int argumentPos) { - return Signature.Argument.of("<ANY>"); + public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { + return Argument.ofGroup("ANY"); } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArgumentMappingTypeStrategy.java similarity index 75% copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArgumentMappingTypeStrategy.java index f186d7fec5f..3e96ef7cda9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArgumentMappingTypeStrategy.java @@ -26,16 +26,20 @@ import org.apache.flink.util.Preconditions; import java.util.List; import java.util.Optional; +import java.util.function.Function; -/** Type strategy that returns the n-th input argument. */ +/** Type strategy that returns the n-th input argument, mapping it with the provided function. */ @Internal -public final class UseArgumentTypeStrategy implements TypeStrategy { +public final class ArgumentMappingTypeStrategy implements TypeStrategy { private final int pos; + private final Function<DataType, Optional<DataType>> mapper; - public UseArgumentTypeStrategy(int pos) { + public ArgumentMappingTypeStrategy(int pos, Function<DataType, Optional<DataType>> mapper) { Preconditions.checkArgument(pos >= 0); + Preconditions.checkNotNull(mapper); this.pos = pos; + this.mapper = mapper; } @Override @@ -44,6 +48,6 @@ public final class UseArgumentTypeStrategy implements TypeStrategy { if (pos >= argumentDataTypes.size()) { return Optional.empty(); } - return Optional.of(argumentDataTypes.get(pos)); + return mapper.apply(argumentDataTypes.get(pos)); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java index 92bdbe56485..fbd4bf188d7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java @@ -27,6 +27,7 @@ import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.ConstantArgumentCount; import org.apache.flink.table.types.inference.InputTypeStrategy; import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.logical.LegacyTypeInformationType; import org.apache.flink.table.types.logical.LogicalType; @@ -68,11 +69,8 @@ class CastInputTypeStrategy implements InputTypeStrategy { return Optional.of(argumentDataTypes); } if (!supportsExplicitCast(fromType, toType)) { - if (throwOnFailure) { - throw callContext.newValidationError( - "Unsupported cast from '%s' to '%s'.", fromType, toType); - } - return Optional.empty(); + return callContext.fail( + throwOnFailure, "Unsupported cast from '%s' to '%s'.", fromType, toType); } return Optional.of(argumentDataTypes); } @@ -80,7 +78,6 @@ class CastInputTypeStrategy implements InputTypeStrategy { @Override public List<Signature> getExpectedSignatures(FunctionDefinition definition) { return Collections.singletonList( - Signature.of( - Signature.Argument.of("<ANY>"), Signature.Argument.of("<TYPE LITERAL>"))); + Signature.of(Argument.ofGroup("ANY"), Argument.ofGroup("TYPE LITERAL"))); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java index a1d769137b6..b2e6f947962 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java @@ -23,7 +23,7 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.ArgumentTypeStrategy; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.utils.LogicalTypeMerging; import org.apache.flink.table.types.utils.TypeConversions; @@ -41,7 +41,7 @@ import java.util.stream.Collectors; @Internal public final class CommonArgumentTypeStrategy implements ArgumentTypeStrategy { - private static final Signature.Argument COMMON_ARGUMENT = Signature.Argument.of("<COMMON>"); + private static final Argument COMMON_ARGUMENT = Argument.ofGroup("COMMON"); private final boolean preserveNullability; @@ -66,8 +66,7 @@ public final class CommonArgumentTypeStrategy implements ArgumentTypeStrategy { } @Override - public Signature.Argument getExpectedArgument( - FunctionDefinition functionDefinition, int argumentPos) { + public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { return COMMON_ARGUMENT; } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonInputTypeStrategy.java index 6224d9fb665..47f250d73bc 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonInputTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonInputTypeStrategy.java @@ -25,6 +25,7 @@ import org.apache.flink.table.types.inference.ArgumentCount; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.InputTypeStrategy; import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.logical.LegacyTypeInformationType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.utils.LogicalTypeMerging; @@ -41,7 +42,7 @@ import java.util.stream.IntStream; @Internal public final class CommonInputTypeStrategy implements InputTypeStrategy { - private static final Signature.Argument COMMON_ARGUMENT = Signature.Argument.of("<COMMON>"); + private static final Argument COMMON_ARGUMENT = Argument.ofGroup("COMMON"); private final ArgumentCount argumentCount; @@ -70,11 +71,10 @@ public final class CommonInputTypeStrategy implements InputTypeStrategy { Optional<LogicalType> commonType = LogicalTypeMerging.findCommonType(argumentTypes); if (!commonType.isPresent()) { - if (throwOnFailure) { - throw callContext.newValidationError( - "Could not find a common type for arguments: %s", argumentDataTypes); - } - return Optional.empty(); + return callContext.fail( + throwOnFailure, + "Could not find a common type for arguments: %s", + argumentDataTypes); } return commonType.map( @@ -96,9 +96,9 @@ public final class CommonInputTypeStrategy implements InputTypeStrategy { .collect(Collectors.toList()); } - List<Signature.Argument> arguments = + List<Argument> arguments = new ArrayList<>(Collections.nCopies(numberOfMandatoryArguments, COMMON_ARGUMENT)); - arguments.add(Signature.Argument.of("<COMMON>...")); + arguments.add(Argument.ofGroupVarying("COMMON")); return Collections.singletonList(Signature.of(arguments)); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java index b5e117a709a..88aa6877c3e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java @@ -79,12 +79,11 @@ public final class ComparableTypeStrategy implements InputTypeStrategy { if (argumentDataTypes.size() == 1) { final LogicalType argType = argumentDataTypes.get(0).getLogicalType(); if (!areComparable(argType, argType)) { - if (throwOnFailure) { - throw callContext.newValidationError( - "Type '%s' should support %s comparison with itself.", - argType, comparisonToString()); - } - return Optional.empty(); + return callContext.fail( + throwOnFailure, + "Type '%s' should support %s comparison with itself.", + argType, + comparisonToString()); } } else { for (int i = 0; i < argumentDataTypes.size() - 1; i++) { @@ -92,13 +91,13 @@ public final class ComparableTypeStrategy implements InputTypeStrategy { final LogicalType secondType = argumentDataTypes.get(i + 1).getLogicalType(); if (!areComparable(firstType, secondType)) { - if (throwOnFailure) { - throw callContext.newValidationError( - "All types in a comparison should support %s comparison with each other. " - + "Can not compare %s with %s", - comparisonToString(), firstType, secondType); - } - return Optional.empty(); + return callContext.fail( + throwOnFailure, + "All types in a comparison should support %s comparison with each other. " + + "Can not compare %s with %s", + comparisonToString(), + firstType, + secondType); } } } @@ -212,7 +211,8 @@ public final class ComparableTypeStrategy implements InputTypeStrategy { @Override public List<Signature> getExpectedSignatures(FunctionDefinition definition) { - return Collections.singletonList(Signature.of(Signature.Argument.of("<COMPARABLE>..."))); + return Collections.singletonList( + Signature.of(Signature.Argument.ofGroupVarying("COMPARABLE"))); } private Boolean hasRequiredComparison(StructuredType structuredType) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CompositeArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CompositeArgumentTypeStrategy.java index 69bbbdf5101..2f55685341e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CompositeArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CompositeArgumentTypeStrategy.java @@ -23,7 +23,7 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.ArgumentTypeStrategy; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import java.util.Optional; @@ -36,19 +36,14 @@ public class CompositeArgumentTypeStrategy implements ArgumentTypeStrategy { CallContext callContext, int argumentPos, boolean throwOnFailure) { DataType dataType = callContext.getArgumentDataTypes().get(argumentPos); if (!LogicalTypeChecks.isCompositeType(dataType.getLogicalType())) { - if (throwOnFailure) { - throw callContext.newValidationError( - "A composite type expected. Got: %s", dataType); - } - return Optional.empty(); + return callContext.fail(throwOnFailure, "A composite type expected. Got: %s", dataType); } return Optional.of(dataType); } @Override - public Signature.Argument getExpectedArgument( - FunctionDefinition functionDefinition, int argumentPos) { - return Signature.Argument.of("<COMPOSITE>"); + public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { + return Argument.ofGroup("COMPOSITE"); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java index 40c83bcb0d4..6f00124131b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java @@ -53,16 +53,12 @@ public final class ConstraintArgumentTypeStrategy implements ArgumentTypeStrateg if (evaluator.test(actualDataTypes)) { return Optional.of(actualDataTypes.get(argumentPos)); } - - if (throwOnFailure) { - throw callContext.newValidationError(constraintMessage, actualDataTypes.toArray()); - } - return Optional.empty(); + return callContext.fail(throwOnFailure, constraintMessage, actualDataTypes.toArray()); } @Override public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { - return Argument.of("<CONSTRAINT>"); + return Argument.ofGroup("CONSTRAINT"); } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java index 8933dd0d5dd..5d97eef250a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java @@ -56,23 +56,17 @@ class CurrentWatermarkInputTypeStrategy implements InputTypeStrategy { final DataType dataType = argumentDataTypes.get(0); if (!LogicalTypeChecks.canBeTimeAttributeType(dataType.getLogicalType())) { - if (throwOnFailure) { - throw callContext.newValidationError( - "CURRENT_WATERMARK() must be called with a single rowtime attribute argument, but '%s' cannot be a time attribute.", - dataType.getLogicalType().asSummaryString()); - } - - return Optional.empty(); + return callContext.fail( + throwOnFailure, + "CURRENT_WATERMARK() must be called with a single rowtime attribute argument, but '%s' cannot be a time attribute.", + dataType.getLogicalType().asSummaryString()); } if (!LogicalTypeChecks.isRowtimeAttribute(dataType.getLogicalType())) { - if (throwOnFailure) { - throw callContext.newValidationError( - "The argument of CURRENT_WATERMARK() must be a rowtime attribute, but was '%s'.", - dataType.getLogicalType().asSummaryString()); - } - - return Optional.empty(); + return callContext.fail( + throwOnFailure, + "The argument of CURRENT_WATERMARK() must be a rowtime attribute, but was '%s'.", + dataType.getLogicalType().asSummaryString()); } return Optional.of(Collections.singletonList(dataType)); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExplicitArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExplicitArgumentTypeStrategy.java index 1a2d99d7528..caba132f7b4 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExplicitArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExplicitArgumentTypeStrategy.java @@ -59,19 +59,18 @@ public final class ExplicitArgumentTypeStrategy implements ArgumentTypeStrategy } // type coercion if (!supportsImplicitCast(actualType, expectedType)) { - if (throwOnFailure) { - throw callContext.newValidationError( - "Unsupported argument type. Expected type '%s' but actual type was '%s'.", - expectedType, actualType); - } - return Optional.empty(); + return callContext.fail( + throwOnFailure, + "Unsupported argument type. Expected type '%s' but actual type was '%s'.", + expectedType, + actualType); } return Optional.of(expectedDataType); } @Override public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { - return Argument.of(expectedDataType.toString()); + return Argument.of(expectedDataType.getLogicalType()); } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExtractInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExtractInputTypeStrategy.java new file mode 100644 index 00000000000..94f02142f9f --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExtractInputTypeStrategy.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.expressions.TimeIntervalUnit; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME; + +/** + * Type strategy for EXTRACT, checking the first value is a valid literal of type {@link + * TimeIntervalUnit}, and that the combination of the second argument type and the interval unit is + * correct. + */ +@Internal +class ExtractInputTypeStrategy implements InputTypeStrategy { + + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.of(2); + } + + @Override + public Optional<List<DataType>> inferInputTypes( + CallContext callContext, boolean throwOnFailure) { + final List<DataType> args = callContext.getArgumentDataTypes(); + final LogicalType temporalArg = args.get(1).getLogicalType(); + if (!temporalArg.isAnyOf(LogicalTypeFamily.DATETIME, LogicalTypeFamily.INTERVAL)) { + return callContext.fail( + throwOnFailure, + "EXTRACT requires 2nd argument to be a temporal type, but type is %s", + temporalArg); + } + final Optional<TimeIntervalUnit> timeIntervalUnit = + callContext.getArgumentValue(0, TimeIntervalUnit.class); + if (!timeIntervalUnit.isPresent()) { + return callContext.fail( + throwOnFailure, + "EXTRACT requires 1st argument to be a TimeIntervalUnit literal"); + } + + switch (timeIntervalUnit.get()) { + case MILLENNIUM: + case CENTURY: + case DECADE: + case YEAR: + case QUARTER: + case MONTH: + case WEEK: + case DAY: + case EPOCH: + return Optional.of(args); + case HOUR: + case MINUTE: + case SECOND: + case MILLISECOND: + case MICROSECOND: + case NANOSECOND: + if (temporalArg.isAnyOf(LogicalTypeFamily.TIME, LogicalTypeFamily.TIMESTAMP) + || temporalArg.is(INTERVAL_DAY_TIME)) { + return Optional.of(args); + } + } + + return callContext.fail( + throwOnFailure, + "EXTRACT does not support TimeIntervalUnit %s for type %s", + timeIntervalUnit.get(), + temporalArg); + } + + @Override + public List<Signature> getExpectedSignatures(FunctionDefinition definition) { + return Collections.singletonList( + Signature.of( + Argument.ofGroup(TimeIntervalUnit.class), Argument.ofGroup("TEMPORAL"))); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java index 00212663e89..08470365040 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java @@ -22,7 +22,7 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.ArgumentTypeStrategy; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.logical.LegacyTypeInformationType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; @@ -84,12 +84,11 @@ public final class FamilyArgumentTypeStrategy implements ArgumentTypeStrategy { } if (Objects.equals(expectedNullability, Boolean.FALSE) && actualType.isNullable()) { - if (throwOnFailure) { - throw callContext.newValidationError( - "Unsupported argument type. Expected nullable type of family '%s' but actual type was '%s'.", - expectedFamily, actualType); - } - return Optional.empty(); + return callContext.fail( + throwOnFailure, + "Unsupported argument type. Expected nullable type of family '%s' but actual type was '%s'.", + expectedFamily, + actualType); } // type is part of the family @@ -116,15 +115,13 @@ public final class FamilyArgumentTypeStrategy implements ArgumentTypeStrategy { } @Override - public Signature.Argument getExpectedArgument( - FunctionDefinition functionDefinition, int argumentPos) { - // "< ... >" to indicate that this is not a type + public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { if (Objects.equals(expectedNullability, Boolean.TRUE)) { - return Signature.Argument.of("<" + expectedFamily + " NULL>"); + return Argument.ofGroup(expectedFamily + " NULL"); } else if (Objects.equals(expectedNullability, Boolean.FALSE)) { - return Signature.Argument.of("<" + expectedFamily + " NOT NULL>"); + return Argument.ofGroup(expectedFamily + " NOT NULL"); } - return Signature.Argument.of("<" + expectedFamily + ">"); + return Argument.ofGroup(expectedFamily); } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/LiteralArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/LiteralArgumentTypeStrategy.java index da9a36df513..6a41f0c7d9a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/LiteralArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/LiteralArgumentTypeStrategy.java @@ -23,7 +23,7 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.ArgumentTypeStrategy; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; import java.util.Objects; import java.util.Optional; @@ -42,27 +42,20 @@ public final class LiteralArgumentTypeStrategy implements ArgumentTypeStrategy { public Optional<DataType> inferArgumentType( CallContext callContext, int argumentPos, boolean throwOnFailure) { if (!callContext.isArgumentLiteral(argumentPos)) { - if (throwOnFailure) { - throw callContext.newValidationError("Literal expected."); - } - return Optional.empty(); + return callContext.fail(throwOnFailure, "Literal expected."); } if (callContext.isArgumentNull(argumentPos) && !allowNull) { - if (throwOnFailure) { - throw callContext.newValidationError("Literal must not be NULL."); - } - return Optional.empty(); + return callContext.fail(throwOnFailure, "Literal must not be NULL."); } return Optional.of(callContext.getArgumentDataTypes().get(argumentPos)); } @Override - public Signature.Argument getExpectedArgument( - FunctionDefinition functionDefinition, int argumentPos) { + public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { if (allowNull) { - return Signature.Argument.of("<LITERAL>"); + return Argument.ofGroup("LITERAL"); } - return Signature.Argument.of("<LITERAL NOT NULL>"); + return Argument.ofGroup("LITERAL NOT NULL"); } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/OutputArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/OutputArgumentTypeStrategy.java index 680d78a9dcf..7b4fa1d4bfd 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/OutputArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/OutputArgumentTypeStrategy.java @@ -22,7 +22,7 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.ArgumentTypeStrategy; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.logical.LogicalTypeRoot; import java.util.Optional; @@ -45,9 +45,8 @@ public final class OutputArgumentTypeStrategy implements ArgumentTypeStrategy { } @Override - public Signature.Argument getExpectedArgument( - FunctionDefinition functionDefinition, int argumentPos) { - return Signature.Argument.of("<OUTPUT>"); + public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { + return Argument.ofGroup("OUTPUT"); } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java index 5a79875d0e8..26799805aa1 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java @@ -59,12 +59,11 @@ public final class RootArgumentTypeStrategy implements ArgumentTypeStrategy { final LogicalType actualType = actualDataType.getLogicalType(); if (Objects.equals(expectedNullability, Boolean.FALSE) && actualType.isNullable()) { - if (throwOnFailure) { - throw callContext.newValidationError( - "Unsupported argument type. Expected nullable type of root '%s' but actual type was '%s'.", - expectedRoot, actualType); - } - return Optional.empty(); + return callContext.fail( + throwOnFailure, + "Unsupported argument type. Expected nullable type of root '%s' but actual type was '%s'.", + expectedRoot, + actualType); } return findDataType( @@ -75,11 +74,11 @@ public final class RootArgumentTypeStrategy implements ArgumentTypeStrategy { public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { // "< ... >" to indicate that this is not a type if (Objects.equals(expectedNullability, Boolean.TRUE)) { - return Argument.of("<" + expectedRoot + " NULL>"); + return Argument.ofGroup(expectedRoot + " NULL"); } else if (Objects.equals(expectedNullability, Boolean.FALSE)) { - return Argument.of("<" + expectedRoot + " NOT NULL>"); + return Argument.ofGroup(expectedRoot + " NOT NULL"); } - return Argument.of("<" + expectedRoot + ">"); + return Argument.ofGroup(expectedRoot); } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java index d9bd4fa3f0f..81259ca175d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java @@ -83,6 +83,13 @@ public final class SpecificInputTypeStrategies { and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL), JSON_ARGUMENT)); + /** See {@link ExtractInputTypeStrategy}. */ + public static final InputTypeStrategy EXTRACT = new ExtractInputTypeStrategy(); + + /** See {@link TemporalOverlapsInputTypeStrategy}. */ + public static final InputTypeStrategy TEMPORAL_OVERLAPS = + new TemporalOverlapsInputTypeStrategy(); + // -------------------------------------------------------------------------------------------- // Strategies composed of other strategies // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java index d48fc8b1aef..046a618c3a8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java @@ -80,6 +80,9 @@ public final class SpecificTypeStrategies { public static final TypeStrategy INTERNAL_REPLICATE_ROWS = new InternalReplicateRowsTypeStrategy(); + /** See {@link ToTimestampLtzTypeStrategy}. */ + public static final TypeStrategy TO_TIMESTAMP_LTZ = new ToTimestampLtzTypeStrategy(); + private SpecificTypeStrategies() { // no instantiation } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java index d8df98e201d..c847042c3a0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java @@ -77,12 +77,11 @@ public final class SubsequenceInputTypeStrategy implements InputTypeStrategy { if (splitDataTypes.isPresent()) { result.addAll(splitDataTypes.get()); } else { - if (throwOnFailure) { - throw callContext.newValidationError( - "Could not infer arguments in range: [%d, %d].", - argumentsSplit.startIndex, argumentsSplit.endIndex); - } - return Optional.empty(); + return callContext.fail( + throwOnFailure, + "Could not infer arguments in range: [%d, %d].", + argumentsSplit.startIndex, + argumentsSplit.endIndex); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SymbolArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SymbolArgumentTypeStrategy.java index 9e6b471f8b7..9a68ed7af64 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SymbolArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SymbolArgumentTypeStrategy.java @@ -27,17 +27,27 @@ import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.Signature; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import java.util.Arrays; +import java.util.HashSet; import java.util.Objects; import java.util.Optional; +import java.util.Set; /** Strategy for a symbol argument of a specific {@link TableSymbol} enum. */ @Internal -public class SymbolArgumentTypeStrategy implements ArgumentTypeStrategy { +public class SymbolArgumentTypeStrategy<T extends Enum<? extends TableSymbol>> + implements ArgumentTypeStrategy { - private final Class<? extends Enum<? extends TableSymbol>> symbolClass; + private final Class<T> symbolClass; + private final Set<T> allowedVariants; - public SymbolArgumentTypeStrategy(Class<? extends Enum<? extends TableSymbol>> symbolClass) { + public SymbolArgumentTypeStrategy(Class<T> symbolClass) { + this(symbolClass, new HashSet<>(Arrays.asList(symbolClass.getEnumConstants()))); + } + + public SymbolArgumentTypeStrategy(Class<T> symbolClass, Set<T> allowedVariants) { this.symbolClass = symbolClass; + this.allowedVariants = allowedVariants; } @Override @@ -46,27 +56,30 @@ public class SymbolArgumentTypeStrategy implements ArgumentTypeStrategy { final DataType argumentType = callContext.getArgumentDataTypes().get(argumentPos); if (argumentType.getLogicalType().getTypeRoot() != LogicalTypeRoot.SYMBOL || !callContext.isArgumentLiteral(argumentPos)) { - if (throwOnFailure) { - throw callContext.newValidationError( - "Unsupported argument type. Expected symbol type '%s' but actual type was '%s'.", - symbolClass.getSimpleName(), argumentType); - } else { - return Optional.empty(); - } + return callContext.fail( + throwOnFailure, + "Unsupported argument type. Expected symbol type '%s' but actual type was '%s'.", + symbolClass.getSimpleName(), + argumentType); } - if (!callContext.getArgumentValue(argumentPos, symbolClass).isPresent()) { - if (throwOnFailure) { - throw callContext.newValidationError( - "Unsupported argument symbol type. Expected symbol '%s' but actual symbol was %s.", - symbolClass.getSimpleName(), - callContext - .getArgumentValue(argumentPos, Enum.class) - .map(e -> "'" + e.getClass().getSimpleName() + "'") - .orElse("invalid")); - } else { - return Optional.empty(); - } + Optional<T> val = callContext.getArgumentValue(argumentPos, symbolClass); + if (!val.isPresent()) { + return callContext.fail( + throwOnFailure, + "Unsupported argument symbol type. Expected symbol '%s' but actual symbol was %s.", + symbolClass.getSimpleName(), + callContext + .getArgumentValue(argumentPos, Enum.class) + .map(e -> "'" + e.getClass().getSimpleName() + "'") + .orElse("invalid")); + } + if (!this.allowedVariants.contains(val.get())) { + return callContext.fail( + throwOnFailure, + "Unsupported argument symbol variant. Expected one of the following variants %s but actual symbol was %s.", + this.allowedVariants, + val.get()); } return Optional.of(argumentType); @@ -75,7 +88,7 @@ public class SymbolArgumentTypeStrategy implements ArgumentTypeStrategy { @Override public Signature.Argument getExpectedArgument( FunctionDefinition functionDefinition, int argumentPos) { - return Signature.Argument.of(String.format("<%s>", symbolClass.getSimpleName())); + return Signature.Argument.ofGroup(symbolClass); } // --------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TemporalOverlapsInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TemporalOverlapsInputTypeStrategy.java new file mode 100644 index 00000000000..92bda0ca15c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TemporalOverlapsInputTypeStrategy.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** Type strategy of {@code TO_TIMESTAMP_LTZ}. */ +@Internal +class TemporalOverlapsInputTypeStrategy implements InputTypeStrategy { + + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.of(4); + } + + @Override + public Optional<List<DataType>> inferInputTypes( + CallContext callContext, boolean throwOnFailure) { + final List<DataType> args = callContext.getArgumentDataTypes(); + final LogicalType leftTimePoint = args.get(0).getLogicalType(); + final LogicalType leftTemporal = args.get(1).getLogicalType(); + final LogicalType rightTimePoint = args.get(2).getLogicalType(); + final LogicalType rightTemporal = args.get(3).getLogicalType(); + + if (!leftTimePoint.is(LogicalTypeFamily.DATETIME)) { + return callContext.fail( + throwOnFailure, + "TEMPORAL_OVERLAPS requires 1st argument 'leftTimePoint' to be a DATETIME type, but is %s", + leftTimePoint); + } + if (!rightTimePoint.is(LogicalTypeFamily.DATETIME)) { + return callContext.fail( + throwOnFailure, + "TEMPORAL_OVERLAPS requires 3rd argument 'rightTimePoint' to be a DATETIME type, but is %s", + rightTimePoint); + } + + if (!leftTimePoint.equals(rightTimePoint)) { + return callContext.fail( + throwOnFailure, + "TEMPORAL_OVERLAPS requires 'leftTimePoint' and 'rightTimePoint' arguments to be of the same type, but is %s != %s", + leftTimePoint, + rightTimePoint); + } + + // leftTemporal is point, then it must be comparable with leftTimePoint + if (leftTemporal.is(LogicalTypeFamily.DATETIME)) { + if (!leftTemporal.equals(leftTimePoint)) { + return callContext.fail( + throwOnFailure, + "TEMPORAL_OVERLAPS requires 'leftTemporal' and 'leftTimePoint' arguments to be of the same type if 'leftTemporal' is a DATETIME, but is %s != %s", + leftTemporal, + leftTimePoint); + } + } else if (!leftTemporal.is(LogicalTypeFamily.INTERVAL)) { + return callContext.fail( + throwOnFailure, + "TEMPORAL_OVERLAPS requires 2nd argument 'leftTemporal' to be DATETIME or INTERVAL type, but is %s", + leftTemporal); + } + + // rightTemporal is point, then it must be comparable with rightTimePoint + if (rightTemporal.is(LogicalTypeFamily.DATETIME)) { + if (!rightTemporal.equals(rightTimePoint)) { + return callContext.fail( + throwOnFailure, + "TEMPORAL_OVERLAPS requires 'rightTemporal' and 'rightTimePoint' arguments to be of the same type if 'rightTemporal' is a DATETIME, but is %s != %s", + rightTemporal, + rightTimePoint); + } + } else if (!rightTemporal.is(LogicalTypeFamily.INTERVAL)) { + return callContext.fail( + throwOnFailure, + "TEMPORAL_OVERLAPS requires 4th argument 'rightTemporal' to be DATETIME or INTERVAL type, but is %s", + rightTemporal); + } + + return Optional.of(args); + } + + @Override + public List<Signature> getExpectedSignatures(FunctionDefinition definition) { + return Collections.singletonList( + Signature.of( + Argument.ofGroup("leftTimePoint", LogicalTypeFamily.DATETIME), + Argument.ofGroup("leftTemporal", "TEMPORAL"), + Argument.ofGroup("rightTimePoint", LogicalTypeFamily.DATETIME), + Argument.ofGroup("rightTemporal", "TEMPORAL"))); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java similarity index 68% rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java index f186d7fec5f..722dd63e51d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java @@ -19,31 +19,23 @@ package org.apache.flink.table.types.inference.strategies; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.TypeStrategy; -import org.apache.flink.util.Preconditions; -import java.util.List; import java.util.Optional; -/** Type strategy that returns the n-th input argument. */ +/** Type strategy of {@code TO_TIMESTAMP_LTZ}. */ @Internal -public final class UseArgumentTypeStrategy implements TypeStrategy { - - private final int pos; - - public UseArgumentTypeStrategy(int pos) { - Preconditions.checkArgument(pos >= 0); - this.pos = pos; - } +public class ToTimestampLtzTypeStrategy implements TypeStrategy { @Override public Optional<DataType> inferType(CallContext callContext) { - final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); - if (pos >= argumentDataTypes.size()) { - return Optional.empty(); + if (callContext.isArgumentLiteral(1)) { + final int precision = callContext.getArgumentValue(1, Integer.class).get(); + return Optional.of(DataTypes.TIMESTAMP_LTZ(precision)); } - return Optional.of(argumentDataTypes.get(pos)); + return Optional.of(DataTypes.TIMESTAMP_LTZ(3)); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TypeLiteralArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TypeLiteralArgumentTypeStrategy.java index e1d74a2ebe5..a5b09261eb5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TypeLiteralArgumentTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TypeLiteralArgumentTypeStrategy.java @@ -46,6 +46,6 @@ public class TypeLiteralArgumentTypeStrategy implements ArgumentTypeStrategy { @Override public Signature.Argument getExpectedArgument( FunctionDefinition functionDefinition, int argumentPos) { - return Signature.Argument.of("<DATA TYPE NOT NULL>"); + return Signature.Argument.ofGroup("DATA TYPE NOT NULL"); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/VaryingSequenceInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/VaryingSequenceInputTypeStrategy.java index 2755bf1d7a8..37b909015be 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/VaryingSequenceInputTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/VaryingSequenceInputTypeStrategy.java @@ -102,9 +102,9 @@ public final class VaryingSequenceInputTypeStrategy implements InputTypeStrategy final Signature.Argument newArg; final String type = varyingArgument.getType(); if (argumentNames == null) { - newArg = Signature.Argument.of(type + "..."); + newArg = Signature.Argument.ofVarying(type); } else { - newArg = Signature.Argument.of(argumentNames.get(constantArgumentCount), type + "..."); + newArg = Signature.Argument.ofVarying(argumentNames.get(constantArgumentCount), type); } final List<Signature.Argument> arguments = new ArrayList<>(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java index 008a05a0ae1..a79fefacbde 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java @@ -95,6 +95,16 @@ public abstract class LogicalType implements Serializable { return Arrays.stream(typeRoots).anyMatch(tr -> this.typeRoot == tr); } + /** + * Returns whether the root of the type is part of at least one family of the {@code typeFamily} + * or not. + * + * @param typeFamilies The families to check against for equality + */ + public boolean isAnyOf(LogicalTypeFamily... typeFamilies) { + return Arrays.stream(typeFamilies).anyMatch(tf -> this.typeRoot.getFamilies().contains(tf)); + } + /** * Returns whether the family type of the type equals to the {@code family} or not. * diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/ExtractFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java similarity index 65% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/ExtractFunctionITCase.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java index deaebadd8be..28434e009a5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/ExtractFunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java @@ -18,26 +18,41 @@ package org.apache.flink.table.planner.functions; +import org.apache.flink.table.api.JsonExistsOnError; import org.apache.flink.table.expressions.TimeIntervalUnit; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import java.time.Duration; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.stream.Stream; import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.BOOLEAN; import static org.apache.flink.table.api.DataTypes.DATE; +import static org.apache.flink.table.api.DataTypes.DAY; +import static org.apache.flink.table.api.DataTypes.HOUR; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.INTERVAL; +import static org.apache.flink.table.api.DataTypes.SECOND; +import static org.apache.flink.table.api.DataTypes.TIME; import static org.apache.flink.table.api.DataTypes.TIMESTAMP; import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; +import static org.apache.flink.table.api.Expressions.temporalOverlaps; -/** Test {@link BuiltInFunctionDefinitions#EXTRACT} and its return type. */ -class ExtractFunctionITCase extends BuiltInFunctionTestBase { +/** Test time-related built-in functions. */ +class TimeFunctionsITCase extends BuiltInFunctionTestBase { @Override Stream<TestSetSpec> getTestSetSpecs() { + return Stream.concat(extractTestCases(), temporalOverlapsTestCases()); + } + + private Stream<TestSetSpec> extractTestCases() { return Stream.of( TestSetSpec.forFunction(BuiltInFunctionDefinitions.EXTRACT) .onFieldsWithData( @@ -45,9 +60,15 @@ class ExtractFunctionITCase extends BuiltInFunctionTestBase { LocalDateTime.of(2020, 2, 29, 1, 56, 59, 987654321), null, LocalDate.of(1990, 10, 14), - Instant.ofEpochMilli(100000012)) + Instant.ofEpochMilli(100000012), + true) .andDataTypes( - TIMESTAMP(), TIMESTAMP(), TIMESTAMP(), DATE(), TIMESTAMP_LTZ(3)) + TIMESTAMP(), + TIMESTAMP(), + TIMESTAMP(), + DATE(), + TIMESTAMP_LTZ(3), + BOOLEAN()) .testResult( $("f0").extract(TimeIntervalUnit.NANOSECOND), "EXTRACT(NANOSECOND FROM f0)", @@ -65,7 +86,11 @@ class ExtractFunctionITCase extends BuiltInFunctionTestBase { BIGINT().nullable()) .testSqlValidationError( "EXTRACT(NANOSECOND FROM f3)", "NANOSECOND can not be applied") - .testSqlResult("EXTRACT(NANOSECOND FROM f4)", 12000000L, BIGINT()) + .testResult( + $("f4").extract(TimeIntervalUnit.NANOSECOND), + "EXTRACT(NANOSECOND FROM f4)", + 12000000L, + BIGINT()) .testResult( $("f0").extract(TimeIntervalUnit.MICROSECOND), "EXTRACT(MICROSECOND FROM f0)", @@ -96,8 +121,11 @@ class ExtractFunctionITCase extends BuiltInFunctionTestBase { "EXTRACT(MILLISECOND FROM f2)", null, BIGINT().nullable()) - // Table API does not support this yet (see FLINK-13785) - .testSqlResult("EXTRACT(MILLISECOND FROM f4)", 12L, BIGINT().nullable()) + .testResult( + $("f4").extract(TimeIntervalUnit.MILLISECOND), + "EXTRACT(MILLISECOND FROM f4)", + 12L, + BIGINT().nullable()) .testResult( $("f0").extract(TimeIntervalUnit.SECOND), "EXTRACT(SECOND FROM f0)", @@ -294,6 +322,107 @@ class ExtractFunctionITCase extends BuiltInFunctionTestBase { call("EXTRACT", TimeIntervalUnit.EPOCH, $("f2")), "EXTRACT(EPOCH FROM f2)", null, - BIGINT().nullable())); + BIGINT().nullable()) + .testTableApiValidationError( + call("EXTRACT", TimeIntervalUnit.EPOCH, $("f5")), + "EXTRACT requires 2nd argument to be a temporal type, but type is BOOLEAN") + .testTableApiValidationError( + call("EXTRACT", JsonExistsOnError.ERROR, $("f2")), + "EXTRACT requires 1st argument to be a TimeIntervalUnit literal")); + } + + private Stream<TestSetSpec> temporalOverlapsTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS) + .onFieldsWithData( + LocalTime.of(2, 55, 0), + Duration.ofHours(1), + LocalTime.of(3, 30, 0), + Duration.ofHours(2)) + .andDataTypes(TIME(), INTERVAL(HOUR()), TIME(), INTERVAL(HOUR())) + .testResult( + temporalOverlaps($("f0"), $("f1"), $("f2"), $("f3")), + "(f0, f1) OVERLAPS (f2, f3)", + true, + BOOLEAN()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS) + .onFieldsWithData( + LocalTime.of(9, 0, 0), + LocalTime.of(9, 30, 0), + LocalTime.of(9, 29, 0), + LocalTime.of(9, 31, 0), + LocalTime.of(10, 0, 0), + LocalTime.of(10, 15, 0), + Duration.ofHours(3)) + .andDataTypes( + TIME(), TIME(), TIME(), TIME(), TIME(), TIME(), INTERVAL(HOUR())) + .testResult( + temporalOverlaps($("f0"), $("f1"), $("f2"), $("f3")), + "(f0, f1) OVERLAPS (f2, f3)", + true, + BOOLEAN()) + .testResult( + temporalOverlaps($("f0"), $("f4"), $("f5"), $("f6")), + "(f0, f4) OVERLAPS (f5, f6)", + false, + BOOLEAN()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS) + .onFieldsWithData( + LocalDate.of(2011, 3, 10), + Duration.ofDays(10), + LocalDate.of(2011, 3, 19), + Duration.ofDays(10)) + .andDataTypes(DATE(), INTERVAL(DAY()), DATE(), INTERVAL(DAY())) + .testResult( + temporalOverlaps($("f0"), $("f1"), $("f2"), $("f3")), + "(f0, f1) OVERLAPS (f2, f3)", + true, + BOOLEAN()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS) + .onFieldsWithData( + LocalDateTime.of(2011, 3, 10, 5, 2, 2), + Duration.ofSeconds(0), + LocalDateTime.of(2011, 3, 10, 5, 2, 2), + LocalDateTime.of(2011, 3, 10, 5, 2, 1), + LocalDateTime.of(2011, 3, 10, 5, 2, 2, 1000000), + LocalDateTime.of(2011, 3, 10, 5, 2, 2, 2000000)) + .andDataTypes( + TIMESTAMP(), + INTERVAL(SECOND()), + TIMESTAMP(), + TIMESTAMP(), + TIMESTAMP(), + TIMESTAMP()) + .testResult( + temporalOverlaps($("f0"), $("f1"), $("f2"), $("f3")), + "(f0, f1) OVERLAPS (f2, f3)", + true, + BOOLEAN()) + .testResult( + temporalOverlaps($("f4"), $("f1"), $("f5"), $("f5")), + "(f4, f1) OVERLAPS (f5, f5)", + false, + BOOLEAN()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS) + .onFieldsWithData( + 1, + LocalDateTime.of(2011, 3, 10, 5, 2, 2), + LocalDate.of(2011, 3, 10)) + .andDataTypes(INT(), TIMESTAMP(), DATE()) + .testTableApiValidationError( + temporalOverlaps($("f0"), $("f1"), $("f1"), $("f1")), + "TEMPORAL_OVERLAPS requires 1st argument 'leftTimePoint' to be a DATETIME type, but is INT") + .testTableApiValidationError( + temporalOverlaps($("f1"), $("f1"), $("f0"), $("f1")), + "TEMPORAL_OVERLAPS requires 3rd argument 'rightTimePoint' to be a DATETIME type, but is INT") + .testTableApiValidationError( + temporalOverlaps($("f1"), $("f1"), $("f2"), $("f2")), + "TEMPORAL_OVERLAPS requires 'leftTimePoint' and 'rightTimePoint' arguments to be of the same type, but is TIMESTAMP(6) != DATE") + .testTableApiValidationError( + temporalOverlaps($("f2"), $("f1"), $("f2"), $("f2")), + "TEMPORAL_OVERLAPS requires 'leftTemporal' and 'leftTimePoint' arguments to be of the same type if 'leftTemporal' is a DATETIME, but is TIMESTAMP(6) != DATE") + .testTableApiValidationError( + temporalOverlaps($("f1"), $("f0"), $("f1"), $("f0")), + "TEMPORAL_OVERLAPS requires 2nd argument 'leftTemporal' to be DATETIME or INTERVAL type, but is INT")); } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala index 93d9e186b00..953c5612446 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala @@ -240,9 +240,6 @@ class SqlExpressionTest extends ExpressionTestBase { testSqlApi("FLOOR(TIME '12:44:31' TO MINUTE)", "12:44:00") testSqlApi("CEIL(TIME '12:44:31' TO MINUTE)", "12:45:00") testSqlApi("QUARTER(DATE '2016-04-12')", "2") - testSqlApi( - "(TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR)", - "TRUE") } @Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala index 68fcf093fd5..4761ef7c0a4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala @@ -1272,8 +1272,8 @@ class TemporalTypesTest extends ExpressionTestBase { ) testExpectedTableApiException( toTimestampLtz("test_string_type", 0), - "toTimestampLtz(test_string_type, 0) requires numeric type for the first input," + - " but the actual type 'String'." + "Unsupported argument type. " + + "Expected type of family 'NUMERIC' but actual type was 'CHAR(16) NOT NULL'" ) // invalid type for the second input @@ -1286,8 +1286,8 @@ class TemporalTypesTest extends ExpressionTestBase { testExpectedTableApiException( toTimestampLtz(123, "test_string_type"), - "toTimestampLtz(123, test_string_type) requires numeric type for the second input," + - " but the actual type 'String'." + "Unsupported argument type. " + + "Expected type of family 'INTEGER_NUMERIC' but actual type was 'CHAR(16) NOT NULL'" ) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala index b5ddd2dad00..a5105607f83 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.{MapFunction, RichFunction, RichMap import org.apache.flink.api.common.functions.util.RuntimeUDFContext import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.configuration.Configuration +import org.apache.flink.core.testutils.FlinkAssertions import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api import org.apache.flink.table.api.{EnvironmentSettings, TableException, ValidationException} @@ -49,6 +50,7 @@ import org.apache.calcite.rel.logical.LogicalCalc import org.apache.calcite.rel.rules._ import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR +import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.{After, Before, Rule} import org.junit.Assert.{assertEquals, assertTrue, fail} import org.junit.rules.ExpectedException @@ -153,23 +155,18 @@ abstract class ExpressionTestBase { invalidTableApiExprs.foreach { case (tableExpr, keywords, clazz) => { - try { - val invalidExprs = mutable.ArrayBuffer[(String, RexNode, String)]() - addTableApiTestExpr(tableExpr, keywords, invalidExprs, clazz) - evaluateGivenExprs(invalidExprs) - fail(s"Expected a $clazz, but no exception is thrown.") - } catch { - case e if e.getClass == clazz => - if (keywords != null) { - assertTrue( - s"The actual exception message \n${e.getMessage}\n" + - s"doesn't contain expected keyword \n$keywords\n", - e.getMessage.contains(keywords)) - } - case e: Throwable => - e.printStackTrace() - fail(s"Expected throw ${clazz.getSimpleName}, but is $e.") + val assertion = if (keywords != null) { + FlinkAssertions.anyCauseMatches(clazz, keywords) + } else { + FlinkAssertions.anyCauseMatches(clazz) } + + assertThatThrownBy( + () => { + val invalidExprs = mutable.ArrayBuffer[(String, RexNode, String)]() + addTableApiTestExpr(tableExpr, keywords, invalidExprs, clazz) + evaluateGivenExprs(invalidExprs) + }).satisfies(assertion) } } }