This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff7a0f6f25748933729bc94038444b3d4e233315
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Mon Mar 21 14:39:56 2022 +0100

    [FLINK-13785][table] Port time functions to new type inference
    
    This closes #19190.
---
 .../java/functions/InternalRowMergerFunction.java  |  11 +-
 .../functions/BuiltInFunctionDefinitions.java      |  66 +++++++---
 .../flink/table/types/inference/CallContext.java   |  16 ++-
 .../table/types/inference/InputTypeStrategies.java |  27 +++-
 .../flink/table/types/inference/Signature.java     |  73 ++++++++++-
 .../table/types/inference/TypeStrategies.java      |   9 +-
 .../strategies/AnyArgumentTypeStrategy.java        |   7 +-
 ...ategy.java => ArgumentMappingTypeStrategy.java} |  12 +-
 .../strategies/CastInputTypeStrategy.java          |  11 +-
 .../strategies/CommonArgumentTypeStrategy.java     |   7 +-
 .../strategies/CommonInputTypeStrategy.java        |  16 +--
 .../strategies/ComparableTypeStrategy.java         |  28 ++--
 .../strategies/CompositeArgumentTypeStrategy.java  |  13 +-
 .../strategies/ConstraintArgumentTypeStrategy.java |   8 +-
 .../CurrentWatermarkInputTypeStrategy.java         |  22 ++--
 .../strategies/ExplicitArgumentTypeStrategy.java   |  13 +-
 .../strategies/ExtractInputTypeStrategy.java       | 108 +++++++++++++++
 .../strategies/FamilyArgumentTypeStrategy.java     |  23 ++--
 .../strategies/LiteralArgumentTypeStrategy.java    |  19 +--
 .../strategies/OutputArgumentTypeStrategy.java     |   7 +-
 .../strategies/RootArgumentTypeStrategy.java       |  17 ++-
 .../strategies/SpecificInputTypeStrategies.java    |   7 +
 .../strategies/SpecificTypeStrategies.java         |   3 +
 .../strategies/SubsequenceInputTypeStrategy.java   |  11 +-
 .../strategies/SymbolArgumentTypeStrategy.java     |  59 +++++----
 .../TemporalOverlapsInputTypeStrategy.java         | 120 +++++++++++++++++
 ...rategy.java => ToTimestampLtzTypeStrategy.java} |  22 +---
 .../TypeLiteralArgumentTypeStrategy.java           |   2 +-
 .../VaryingSequenceInputTypeStrategy.java          |   4 +-
 .../flink/table/types/logical/LogicalType.java     |  10 ++
 ...unctionITCase.java => TimeFunctionsITCase.java} | 145 +++++++++++++++++++--
 .../planner/expressions/SqlExpressionTest.scala    |   3 -
 .../planner/expressions/TemporalTypesTest.scala    |   8 +-
 .../expressions/utils/ExpressionTestBase.scala     |  29 ++---
 34 files changed, 710 insertions(+), 226 deletions(-)

diff --git 
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java
 
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java
index f0cfe4fc993..389c40fd0c5 100644
--- 
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java
+++ 
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java
@@ -80,11 +80,8 @@ public final class InternalRowMergerFunction extends 
ScalarFunction {
                                 if (arg0.getLogicalType().getTypeRoot() != 
LogicalTypeRoot.ROW
                                         || arg1.getLogicalType().getTypeRoot()
                                                 != LogicalTypeRoot.ROW) {
-                                    if (throwOnFailure) {
-                                        throw callContext.newValidationError(
-                                                "Two row arguments expected.");
-                                    }
-                                    return Optional.empty();
+                                    return callContext.fail(
+                                            throwOnFailure, "Two row arguments 
expected.");
                                 }
                                 // keep the original logical type but express 
that both arguments
                                 // should use internal data structures
@@ -99,7 +96,9 @@ public final class InternalRowMergerFunction extends 
ScalarFunction {
                                     FunctionDefinition definition) {
                                 // this helps in printing nice error messages
                                 return Collections.singletonList(
-                                        Signature.of(Argument.of("ROW"), 
Argument.of("ROW")));
+                                        Signature.of(
+                                                
Argument.ofGroup(LogicalTypeRoot.ROW),
+                                                
Argument.ofGroup(LogicalTypeRoot.ROW)));
                             }
                         })
                 .outputTypeStrategy(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 725d823a1bf..fe0d40ef098 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.JsonQueryWrapper;
 import org.apache.flink.table.api.JsonType;
 import org.apache.flink.table.api.JsonValueOnEmptyOrError;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.TimePointUnit;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategies;
@@ -48,6 +49,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
 import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
 import static org.apache.flink.table.functions.FunctionKind.OTHER;
 import static org.apache.flink.table.functions.FunctionKind.SCALAR;
@@ -365,7 +374,7 @@ public final class BuiltInFunctionDefinitions {
                     .name("count")
                     .kind(AGGREGATE)
                     .inputTypeStrategy(sequence(ANY)) // COUNT(*) is not 
supported yet
-                    .outputTypeStrategy(explicit(DataTypes.BIGINT().notNull()))
+                    .outputTypeStrategy(explicit(BIGINT().notNull()))
                     .build();
 
     public static final BuiltInFunctionDefinition MAX =
@@ -463,7 +472,7 @@ public final class BuiltInFunctionDefinitions {
                     .name("charLength")
                     .kind(SCALAR)
                     
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
-                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.INT())))
+                    .outputTypeStrategy(nullableIfArgs(explicit(INT())))
                     .build();
 
     public static final BuiltInFunctionDefinition INIT_CAP =
@@ -581,7 +590,7 @@ public final class BuiltInFunctionDefinitions {
                             sequence(
                                     
logical(LogicalTypeFamily.CHARACTER_STRING),
                                     
logical(LogicalTypeFamily.CHARACTER_STRING)))
-                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.INT())))
+                    .outputTypeStrategy(nullableIfArgs(explicit(INT())))
                     .build();
 
     public static final BuiltInFunctionDefinition OVERLAY =
@@ -1179,7 +1188,7 @@ public final class BuiltInFunctionDefinitions {
                                     sequence(
                                             logical(LogicalTypeRoot.INTEGER),
                                             logical(LogicalTypeRoot.INTEGER))))
-                    .outputTypeStrategy(explicit(DataTypes.INT().notNull()))
+                    .outputTypeStrategy(explicit(INT().notNull()))
                     .build();
 
     public static final BuiltInFunctionDefinition BIN =
@@ -1222,76 +1231,103 @@ public final class BuiltInFunctionDefinitions {
             BuiltInFunctionDefinition.newBuilder()
                     .name("extract")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .inputTypeStrategy(SpecificInputTypeStrategies.EXTRACT)
+                    .outputTypeStrategy(nullableIfArgs(explicit(BIGINT())))
                     .build();
 
     public static final BuiltInFunctionDefinition CURRENT_DATE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("currentDate")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .outputTypeStrategy(explicit(DATE().notNull()))
                     .build();
 
     public static final BuiltInFunctionDefinition CURRENT_TIME =
             BuiltInFunctionDefinition.newBuilder()
                     .name("currentTime")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .outputTypeStrategy(explicit(TIME().notNull()))
                     .build();
 
     public static final BuiltInFunctionDefinition CURRENT_TIMESTAMP =
             BuiltInFunctionDefinition.newBuilder()
                     .name("currentTimestamp")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull()))
                     .build();
 
     public static final BuiltInFunctionDefinition CURRENT_ROW_TIMESTAMP =
             BuiltInFunctionDefinition.newBuilder()
                     .name("currentRowTimestamp")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .outputTypeStrategy(explicit(TIMESTAMP_LTZ(3).notNull()))
                     .build();
 
     public static final BuiltInFunctionDefinition LOCAL_TIME =
             BuiltInFunctionDefinition.newBuilder()
                     .name("localTime")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .outputTypeStrategy(explicit(TIME().notNull()))
                     .build();
 
     public static final BuiltInFunctionDefinition LOCAL_TIMESTAMP =
             BuiltInFunctionDefinition.newBuilder()
                     .name("localTimestamp")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .outputTypeStrategy(explicit(TIMESTAMP(3).notNull()))
                     .build();
 
     public static final BuiltInFunctionDefinition TEMPORAL_OVERLAPS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("temporalOverlaps")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    
.inputTypeStrategy(SpecificInputTypeStrategies.TEMPORAL_OVERLAPS)
+                    .outputTypeStrategy(nullableIfArgs(explicit(BOOLEAN())))
                     .build();
 
     public static final BuiltInFunctionDefinition DATE_FORMAT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("dateFormat")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .inputTypeStrategy(
+                            or(
+                                    sequence(
+                                            
logical(LogicalTypeFamily.TIMESTAMP),
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING)),
+                                    sequence(
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING))))
+                    .outputTypeStrategy(nullableIfArgs(explicit(STRING())))
                     .build();
 
     public static final BuiltInFunctionDefinition TIMESTAMP_DIFF =
             BuiltInFunctionDefinition.newBuilder()
                     .name("timestampDiff")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .inputTypeStrategy(
+                            sequence(
+                                    symbol(
+                                            TimePointUnit.YEAR,
+                                            TimePointUnit.QUARTER,
+                                            TimePointUnit.MONTH,
+                                            TimePointUnit.WEEK,
+                                            TimePointUnit.DAY,
+                                            TimePointUnit.HOUR,
+                                            TimePointUnit.MINUTE,
+                                            TimePointUnit.SECOND),
+                                    logical(LogicalTypeFamily.DATETIME),
+                                    logical(LogicalTypeFamily.DATETIME)))
+                    .outputTypeStrategy(nullableIfArgs(explicit(INT())))
                     .build();
     public static final BuiltInFunctionDefinition TO_TIMESTAMP_LTZ =
             BuiltInFunctionDefinition.newBuilder()
                     .name("toTimestampLtz")
                     .kind(SCALAR)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
+                    .inputTypeStrategy(
+                            sequence(
+                                    logical(LogicalTypeFamily.NUMERIC),
+                                    logical(LogicalTypeFamily.INTEGER_NUMERIC, 
false)))
+                    
.outputTypeStrategy(SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
                     .build();
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java
index 13cfc11e2ca..62fdd9f4d8a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java
@@ -86,13 +86,27 @@ public interface CallContext {
     Optional<DataType> getOutputDataType();
 
     /**
-     * Creates a validation error for exiting the type inference process with 
a meaningful
+     * Creates a validation exception for exiting the type inference process 
with a meaningful
      * exception.
      */
     default ValidationException newValidationError(String message, Object... 
args) {
         return new ValidationException(String.format(message, args));
     }
 
+    /**
+     * Helper method for handling failures during the type inference process 
while considering the
+     * {@code throwOnFailure} flag.
+     *
+     * <p>Shorthand for {@code if (throwOnFailure) throw 
ValidationException(...) else return
+     * Optional.empty()}.
+     */
+    default <T> Optional<T> fail(boolean throwOnFailure, String message, 
Object... args) {
+        if (throwOnFailure) {
+            throw newValidationError(message, args);
+        }
+        return Optional.empty();
+    }
+
     /**
      * Returns whether the function call happens as part of an aggregation 
that defines grouping
      * columns.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index 2f3feea6259..fe7157453e0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -50,6 +50,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Strategies for inferring and validating input arguments in a function call.
@@ -304,10 +305,30 @@ public final class InputTypeStrategies {
         return new OrArgumentTypeStrategy(Arrays.asList(strategies));
     }
 
-    /** Strategy for a symbol argument of a specific {@link TableSymbol} enum. 
*/
-    public static SymbolArgumentTypeStrategy symbol(
+    /**
+     * Strategy for a symbol argument of a specific {@link TableSymbol} enum.
+     *
+     * <p>A symbol is implied to be a literal argument.
+     */
+    public static SymbolArgumentTypeStrategy<?> symbol(
             Class<? extends Enum<? extends TableSymbol>> clazz) {
-        return new SymbolArgumentTypeStrategy(clazz);
+        return new SymbolArgumentTypeStrategy<>(clazz);
+    }
+
+    /**
+     * Strategy for a symbol argument of a specific {@link TableSymbol} enum, 
with value being one
+     * of the provided variants.
+     *
+     * <p>A symbol is implied to be a literal argument.
+     */
+    @SafeVarargs
+    @SuppressWarnings("unchecked")
+    public static <T extends Enum<? extends TableSymbol>> 
SymbolArgumentTypeStrategy<T> symbol(
+            T firstAllowedVariant, T... otherAllowedVariants) {
+        return new SymbolArgumentTypeStrategy<T>(
+                (Class<T>) firstAllowedVariant.getClass(),
+                Stream.concat(Stream.of(firstAllowedVariant), 
Arrays.stream(otherAllowedVariants))
+                        .collect(Collectors.toSet()));
     }
 
     /**
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/Signature.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/Signature.java
index e845ca8f217..0d9495a365c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/Signature.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/Signature.java
@@ -19,7 +19,11 @@
 package org.apache.flink.table.types.inference;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.TableSymbol;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -61,8 +65,12 @@ public final class Signature {
     /**
      * Representation of a single argument in a signature.
      *
-     * <p>The type is represented as {@link String} in order to also express 
type families or
-     * varargs.
+     * <p>The argument is represented as a {@link String} in order to express 
both explicit types
+     * (see {@code of(...)}) or groups/families of types (see {@code 
ofKind(...)}).
+     *
+     * <p>The general string formatting convention is to use {@code T} for 
explicit types, {@code
+     * &lt;T&gt;} for groups/families of types, {@code T...} for varargs, and 
{@code [T]} for
+     * conditions.
      */
     @PublicEvolving
     public static final class Argument {
@@ -76,16 +84,71 @@ public final class Signature {
             this.type = Preconditions.checkNotNull(type);
         }
 
-        /** Returns an instance of {@link Argument}. */
         public static Argument of(String name, String type) {
-            return new Argument(Preconditions.checkNotNull(name, "Name must 
not be null."), type);
+            return new Argument(name, type);
+        }
+
+        public static Argument of(String name, LogicalType type) {
+            return of(name, type.asSummaryString());
+        }
+
+        public static Argument ofVarying(String name, String type) {
+            return new Argument(name, type + "...");
         }
 
-        /** Returns an instance of {@link Argument}. */
         public static Argument of(String type) {
             return new Argument(null, type);
         }
 
+        public static Argument of(LogicalType type) {
+            return of(type.asSummaryString());
+        }
+
+        public static Argument ofVarying(String type) {
+            return new Argument(null, type + "...");
+        }
+
+        public static Argument ofGroup(String name, String typeGroup) {
+            return new Argument(name, "<" + typeGroup + ">");
+        }
+
+        public static Argument ofGroup(String name, LogicalTypeRoot typeRoot) {
+            return ofGroup(name, typeRoot.name());
+        }
+
+        public static Argument ofGroup(String name, LogicalTypeFamily 
typeFamily) {
+            return ofGroup(name, typeFamily.name());
+        }
+
+        public static Argument ofGroup(
+                String name, Class<? extends Enum<? extends TableSymbol>> 
symbol) {
+            return ofGroup(name, symbol.getSimpleName());
+        }
+
+        public static Argument ofGroupVarying(String name, String typeGroup) {
+            return new Argument(name, "<" + typeGroup + ">...");
+        }
+
+        public static Argument ofGroup(String typeGroup) {
+            return ofGroup(null, typeGroup);
+        }
+
+        public static Argument ofGroup(LogicalTypeRoot typeRoot) {
+            return ofGroup(typeRoot.name());
+        }
+
+        public static Argument ofGroup(LogicalTypeFamily typeFamily) {
+            return ofGroup(typeFamily.name());
+        }
+
+        public static Argument ofGroup(Class<? extends Enum<? extends 
TableSymbol>> symbol) {
+            return ofGroup(symbol.getSimpleName());
+        }
+
+        public static Argument ofGroupVarying(String typeGroup) {
+            return new Argument(null, "<" + typeGroup + ">...");
+        }
+
         public Optional<String> getName() {
             return Optional.ofNullable(name);
         }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
index 421fd71b405..a518ba03b6a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.types.inference;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.types.DataType;
+import 
org.apache.flink.table.types.inference.strategies.ArgumentMappingTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.CommonTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ExplicitTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.FirstTypeStrategy;
@@ -28,7 +29,6 @@ import 
org.apache.flink.table.types.inference.strategies.MappingTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.MatchFamilyTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.MissingTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.NullableIfArgsTypeStrategy;
-import 
org.apache.flink.table.types.inference.strategies.UseArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.VaryingStringTypeStrategy;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
@@ -62,7 +62,12 @@ public final class TypeStrategies {
 
     /** Type strategy that returns the n-th input argument. */
     public static TypeStrategy argument(int pos) {
-        return new UseArgumentTypeStrategy(pos);
+        return new ArgumentMappingTypeStrategy(pos, Optional::of);
+    }
+
+    /** Type strategy that returns the n-th input argument, mapping it. */
+    public static TypeStrategy argument(int pos, Function<DataType, 
Optional<DataType>> mapper) {
+        return new ArgumentMappingTypeStrategy(pos, mapper);
     }
 
     /** Type strategy that returns the first type that could be inferred. */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/AnyArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/AnyArgumentTypeStrategy.java
index 3163e453b21..4dbb3d7a112 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/AnyArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/AnyArgumentTypeStrategy.java
@@ -22,7 +22,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
 
 import java.util.Optional;
 
@@ -37,9 +37,8 @@ public final class AnyArgumentTypeStrategy implements 
ArgumentTypeStrategy {
     }
 
     @Override
-    public Signature.Argument getExpectedArgument(
-            FunctionDefinition functionDefinition, int argumentPos) {
-        return Signature.Argument.of("<ANY>");
+    public Argument getExpectedArgument(FunctionDefinition functionDefinition, 
int argumentPos) {
+        return Argument.ofGroup("ANY");
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArgumentMappingTypeStrategy.java
similarity index 75%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java
copy to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArgumentMappingTypeStrategy.java
index f186d7fec5f..3e96ef7cda9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArgumentMappingTypeStrategy.java
@@ -26,16 +26,20 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 
-/** Type strategy that returns the n-th input argument. */
+/** Type strategy that returns the n-th input argument, mapping it with the 
provided function. */
 @Internal
-public final class UseArgumentTypeStrategy implements TypeStrategy {
+public final class ArgumentMappingTypeStrategy implements TypeStrategy {
 
     private final int pos;
+    private final Function<DataType, Optional<DataType>> mapper;
 
-    public UseArgumentTypeStrategy(int pos) {
+    public ArgumentMappingTypeStrategy(int pos, Function<DataType, 
Optional<DataType>> mapper) {
         Preconditions.checkArgument(pos >= 0);
+        Preconditions.checkNotNull(mapper);
         this.pos = pos;
+        this.mapper = mapper;
     }
 
     @Override
@@ -44,6 +48,6 @@ public final class UseArgumentTypeStrategy implements 
TypeStrategy {
         if (pos >= argumentDataTypes.size()) {
             return Optional.empty();
         }
-        return Optional.of(argumentDataTypes.get(pos));
+        return mapper.apply(argumentDataTypes.get(pos));
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java
index 92bdbe56485..fbd4bf188d7 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
 import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
 
@@ -68,11 +69,8 @@ class CastInputTypeStrategy implements InputTypeStrategy {
             return Optional.of(argumentDataTypes);
         }
         if (!supportsExplicitCast(fromType, toType)) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "Unsupported cast from '%s' to '%s'.", fromType, 
toType);
-            }
-            return Optional.empty();
+            return callContext.fail(
+                    throwOnFailure, "Unsupported cast from '%s' to '%s'.", 
fromType, toType);
         }
         return Optional.of(argumentDataTypes);
     }
@@ -80,7 +78,6 @@ class CastInputTypeStrategy implements InputTypeStrategy {
     @Override
     public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
         return Collections.singletonList(
-                Signature.of(
-                        Signature.Argument.of("<ANY>"), 
Signature.Argument.of("<TYPE LITERAL>")));
+                Signature.of(Argument.ofGroup("ANY"), Argument.ofGroup("TYPE 
LITERAL")));
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java
index a1d769137b6..b2e6f947962 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -41,7 +41,7 @@ import java.util.stream.Collectors;
 @Internal
 public final class CommonArgumentTypeStrategy implements ArgumentTypeStrategy {
 
-    private static final Signature.Argument COMMON_ARGUMENT = 
Signature.Argument.of("<COMMON>");
+    private static final Argument COMMON_ARGUMENT = Argument.ofGroup("COMMON");
 
     private final boolean preserveNullability;
 
@@ -66,8 +66,7 @@ public final class CommonArgumentTypeStrategy implements 
ArgumentTypeStrategy {
     }
 
     @Override
-    public Signature.Argument getExpectedArgument(
-            FunctionDefinition functionDefinition, int argumentPos) {
+    public Argument getExpectedArgument(FunctionDefinition functionDefinition, 
int argumentPos) {
         return COMMON_ARGUMENT;
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonInputTypeStrategy.java
index 6224d9fb665..47f250d73bc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonInputTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonInputTypeStrategy.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.types.inference.ArgumentCount;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
 import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
@@ -41,7 +42,7 @@ import java.util.stream.IntStream;
 @Internal
 public final class CommonInputTypeStrategy implements InputTypeStrategy {
 
-    private static final Signature.Argument COMMON_ARGUMENT = 
Signature.Argument.of("<COMMON>");
+    private static final Argument COMMON_ARGUMENT = Argument.ofGroup("COMMON");
 
     private final ArgumentCount argumentCount;
 
@@ -70,11 +71,10 @@ public final class CommonInputTypeStrategy implements 
InputTypeStrategy {
         Optional<LogicalType> commonType = 
LogicalTypeMerging.findCommonType(argumentTypes);
 
         if (!commonType.isPresent()) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "Could not find a common type for arguments: %s", 
argumentDataTypes);
-            }
-            return Optional.empty();
+            return callContext.fail(
+                    throwOnFailure,
+                    "Could not find a common type for arguments: %s",
+                    argumentDataTypes);
         }
 
         return commonType.map(
@@ -96,9 +96,9 @@ public final class CommonInputTypeStrategy implements 
InputTypeStrategy {
                     .collect(Collectors.toList());
         }
 
-        List<Signature.Argument> arguments =
+        List<Argument> arguments =
                 new 
ArrayList<>(Collections.nCopies(numberOfMandatoryArguments, COMMON_ARGUMENT));
-        arguments.add(Signature.Argument.of("<COMMON>..."));
+        arguments.add(Argument.ofGroupVarying("COMMON"));
         return Collections.singletonList(Signature.of(arguments));
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java
index b5e117a709a..88aa6877c3e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java
@@ -79,12 +79,11 @@ public final class ComparableTypeStrategy implements 
InputTypeStrategy {
         if (argumentDataTypes.size() == 1) {
             final LogicalType argType = 
argumentDataTypes.get(0).getLogicalType();
             if (!areComparable(argType, argType)) {
-                if (throwOnFailure) {
-                    throw callContext.newValidationError(
-                            "Type '%s' should support %s comparison with 
itself.",
-                            argType, comparisonToString());
-                }
-                return Optional.empty();
+                return callContext.fail(
+                        throwOnFailure,
+                        "Type '%s' should support %s comparison with itself.",
+                        argType,
+                        comparisonToString());
             }
         } else {
             for (int i = 0; i < argumentDataTypes.size() - 1; i++) {
@@ -92,13 +91,13 @@ public final class ComparableTypeStrategy implements 
InputTypeStrategy {
                 final LogicalType secondType = argumentDataTypes.get(i + 
1).getLogicalType();
 
                 if (!areComparable(firstType, secondType)) {
-                    if (throwOnFailure) {
-                        throw callContext.newValidationError(
-                                "All types in a comparison should support %s 
comparison with each other. "
-                                        + "Can not compare %s with %s",
-                                comparisonToString(), firstType, secondType);
-                    }
-                    return Optional.empty();
+                    return callContext.fail(
+                            throwOnFailure,
+                            "All types in a comparison should support %s 
comparison with each other. "
+                                    + "Can not compare %s with %s",
+                            comparisonToString(),
+                            firstType,
+                            secondType);
                 }
             }
         }
@@ -212,7 +211,8 @@ public final class ComparableTypeStrategy implements 
InputTypeStrategy {
 
     @Override
     public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
-        return 
Collections.singletonList(Signature.of(Signature.Argument.of("<COMPARABLE>...")));
+        return Collections.singletonList(
+                Signature.of(Signature.Argument.ofGroupVarying("COMPARABLE")));
     }
 
     private Boolean hasRequiredComparison(StructuredType structuredType) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CompositeArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CompositeArgumentTypeStrategy.java
index 69bbbdf5101..2f55685341e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CompositeArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CompositeArgumentTypeStrategy.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
 import java.util.Optional;
@@ -36,19 +36,14 @@ public class CompositeArgumentTypeStrategy implements 
ArgumentTypeStrategy {
             CallContext callContext, int argumentPos, boolean throwOnFailure) {
         DataType dataType = 
callContext.getArgumentDataTypes().get(argumentPos);
         if (!LogicalTypeChecks.isCompositeType(dataType.getLogicalType())) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "A composite type expected. Got: %s", dataType);
-            }
-            return Optional.empty();
+            return callContext.fail(throwOnFailure, "A composite type 
expected. Got: %s", dataType);
         }
 
         return Optional.of(dataType);
     }
 
     @Override
-    public Signature.Argument getExpectedArgument(
-            FunctionDefinition functionDefinition, int argumentPos) {
-        return Signature.Argument.of("<COMPOSITE>");
+    public Argument getExpectedArgument(FunctionDefinition functionDefinition, 
int argumentPos) {
+        return Argument.ofGroup("COMPOSITE");
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java
index 40c83bcb0d4..6f00124131b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java
@@ -53,16 +53,12 @@ public final class ConstraintArgumentTypeStrategy 
implements ArgumentTypeStrateg
         if (evaluator.test(actualDataTypes)) {
             return Optional.of(actualDataTypes.get(argumentPos));
         }
-
-        if (throwOnFailure) {
-            throw callContext.newValidationError(constraintMessage, 
actualDataTypes.toArray());
-        }
-        return Optional.empty();
+        return callContext.fail(throwOnFailure, constraintMessage, 
actualDataTypes.toArray());
     }
 
     @Override
     public Argument getExpectedArgument(FunctionDefinition functionDefinition, 
int argumentPos) {
-        return Argument.of("<CONSTRAINT>");
+        return Argument.ofGroup("CONSTRAINT");
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
index 8933dd0d5dd..5d97eef250a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CurrentWatermarkInputTypeStrategy.java
@@ -56,23 +56,17 @@ class CurrentWatermarkInputTypeStrategy implements 
InputTypeStrategy {
 
         final DataType dataType = argumentDataTypes.get(0);
         if 
(!LogicalTypeChecks.canBeTimeAttributeType(dataType.getLogicalType())) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "CURRENT_WATERMARK() must be called with a single 
rowtime attribute argument, but '%s' cannot be a time attribute.",
-                        dataType.getLogicalType().asSummaryString());
-            }
-
-            return Optional.empty();
+            return callContext.fail(
+                    throwOnFailure,
+                    "CURRENT_WATERMARK() must be called with a single rowtime 
attribute argument, but '%s' cannot be a time attribute.",
+                    dataType.getLogicalType().asSummaryString());
         }
 
         if (!LogicalTypeChecks.isRowtimeAttribute(dataType.getLogicalType())) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "The argument of CURRENT_WATERMARK() must be a rowtime 
attribute, but was '%s'.",
-                        dataType.getLogicalType().asSummaryString());
-            }
-
-            return Optional.empty();
+            return callContext.fail(
+                    throwOnFailure,
+                    "The argument of CURRENT_WATERMARK() must be a rowtime 
attribute, but was '%s'.",
+                    dataType.getLogicalType().asSummaryString());
         }
 
         return Optional.of(Collections.singletonList(dataType));
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExplicitArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExplicitArgumentTypeStrategy.java
index 1a2d99d7528..caba132f7b4 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExplicitArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExplicitArgumentTypeStrategy.java
@@ -59,19 +59,18 @@ public final class ExplicitArgumentTypeStrategy implements 
ArgumentTypeStrategy
         }
         // type coercion
         if (!supportsImplicitCast(actualType, expectedType)) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "Unsupported argument type. Expected type '%s' but 
actual type was '%s'.",
-                        expectedType, actualType);
-            }
-            return Optional.empty();
+            return callContext.fail(
+                    throwOnFailure,
+                    "Unsupported argument type. Expected type '%s' but actual 
type was '%s'.",
+                    expectedType,
+                    actualType);
         }
         return Optional.of(expectedDataType);
     }
 
     @Override
     public Argument getExpectedArgument(FunctionDefinition functionDefinition, 
int argumentPos) {
-        return Argument.of(expectedDataType.toString());
+        return Argument.of(expectedDataType.getLogicalType());
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExtractInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExtractInputTypeStrategy.java
new file mode 100644
index 00000000000..94f02142f9f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ExtractInputTypeStrategy.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME;
+
+/**
+ * Type strategy for EXTRACT, checking the first value is a valid literal of 
type {@link
+ * TimeIntervalUnit}, and that the combination of the second argument type and 
the interval unit is
+ * correct.
+ */
+@Internal
+class ExtractInputTypeStrategy implements InputTypeStrategy {
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.of(2);
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            CallContext callContext, boolean throwOnFailure) {
+        final List<DataType> args = callContext.getArgumentDataTypes();
+        final LogicalType temporalArg = args.get(1).getLogicalType();
+        if (!temporalArg.isAnyOf(LogicalTypeFamily.DATETIME, 
LogicalTypeFamily.INTERVAL)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "EXTRACT requires 2nd argument to be a temporal type, but 
type is %s",
+                    temporalArg);
+        }
+        final Optional<TimeIntervalUnit> timeIntervalUnit =
+                callContext.getArgumentValue(0, TimeIntervalUnit.class);
+        if (!timeIntervalUnit.isPresent()) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "EXTRACT requires 1st argument to be a TimeIntervalUnit 
literal");
+        }
+
+        switch (timeIntervalUnit.get()) {
+            case MILLENNIUM:
+            case CENTURY:
+            case DECADE:
+            case YEAR:
+            case QUARTER:
+            case MONTH:
+            case WEEK:
+            case DAY:
+            case EPOCH:
+                return Optional.of(args);
+            case HOUR:
+            case MINUTE:
+            case SECOND:
+            case MILLISECOND:
+            case MICROSECOND:
+            case NANOSECOND:
+                if (temporalArg.isAnyOf(LogicalTypeFamily.TIME, 
LogicalTypeFamily.TIMESTAMP)
+                        || temporalArg.is(INTERVAL_DAY_TIME)) {
+                    return Optional.of(args);
+                }
+        }
+
+        return callContext.fail(
+                throwOnFailure,
+                "EXTRACT does not support TimeIntervalUnit %s for type %s",
+                timeIntervalUnit.get(),
+                temporalArg);
+    }
+
+    @Override
+    public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
+        return Collections.singletonList(
+                Signature.of(
+                        Argument.ofGroup(TimeIntervalUnit.class), 
Argument.ofGroup("TEMPORAL")));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java
index 00212663e89..08470365040 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java
@@ -22,7 +22,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
@@ -84,12 +84,11 @@ public final class FamilyArgumentTypeStrategy implements 
ArgumentTypeStrategy {
         }
 
         if (Objects.equals(expectedNullability, Boolean.FALSE) && 
actualType.isNullable()) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "Unsupported argument type. Expected nullable type of 
family '%s' but actual type was '%s'.",
-                        expectedFamily, actualType);
-            }
-            return Optional.empty();
+            return callContext.fail(
+                    throwOnFailure,
+                    "Unsupported argument type. Expected nullable type of 
family '%s' but actual type was '%s'.",
+                    expectedFamily,
+                    actualType);
         }
 
         // type is part of the family
@@ -116,15 +115,13 @@ public final class FamilyArgumentTypeStrategy implements 
ArgumentTypeStrategy {
     }
 
     @Override
-    public Signature.Argument getExpectedArgument(
-            FunctionDefinition functionDefinition, int argumentPos) {
-        // "< ... >" to indicate that this is not a type
+    public Argument getExpectedArgument(FunctionDefinition functionDefinition, 
int argumentPos) {
         if (Objects.equals(expectedNullability, Boolean.TRUE)) {
-            return Signature.Argument.of("<" + expectedFamily + " NULL>");
+            return Argument.ofGroup(expectedFamily + " NULL");
         } else if (Objects.equals(expectedNullability, Boolean.FALSE)) {
-            return Signature.Argument.of("<" + expectedFamily + " NOT NULL>");
+            return Argument.ofGroup(expectedFamily + " NOT NULL");
         }
-        return Signature.Argument.of("<" + expectedFamily + ">");
+        return Argument.ofGroup(expectedFamily);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/LiteralArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/LiteralArgumentTypeStrategy.java
index da9a36df513..6a41f0c7d9a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/LiteralArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/LiteralArgumentTypeStrategy.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
 
 import java.util.Objects;
 import java.util.Optional;
@@ -42,27 +42,20 @@ public final class LiteralArgumentTypeStrategy implements 
ArgumentTypeStrategy {
     public Optional<DataType> inferArgumentType(
             CallContext callContext, int argumentPos, boolean throwOnFailure) {
         if (!callContext.isArgumentLiteral(argumentPos)) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError("Literal expected.");
-            }
-            return Optional.empty();
+            return callContext.fail(throwOnFailure, "Literal expected.");
         }
         if (callContext.isArgumentNull(argumentPos) && !allowNull) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError("Literal must not be 
NULL.");
-            }
-            return Optional.empty();
+            return callContext.fail(throwOnFailure, "Literal must not be 
NULL.");
         }
         return 
Optional.of(callContext.getArgumentDataTypes().get(argumentPos));
     }
 
     @Override
-    public Signature.Argument getExpectedArgument(
-            FunctionDefinition functionDefinition, int argumentPos) {
+    public Argument getExpectedArgument(FunctionDefinition functionDefinition, 
int argumentPos) {
         if (allowNull) {
-            return Signature.Argument.of("<LITERAL>");
+            return Argument.ofGroup("LITERAL");
         }
-        return Signature.Argument.of("<LITERAL NOT NULL>");
+        return Argument.ofGroup("LITERAL NOT NULL");
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/OutputArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/OutputArgumentTypeStrategy.java
index 680d78a9dcf..7b4fa1d4bfd 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/OutputArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/OutputArgumentTypeStrategy.java
@@ -22,7 +22,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import java.util.Optional;
@@ -45,9 +45,8 @@ public final class OutputArgumentTypeStrategy implements 
ArgumentTypeStrategy {
     }
 
     @Override
-    public Signature.Argument getExpectedArgument(
-            FunctionDefinition functionDefinition, int argumentPos) {
-        return Signature.Argument.of("<OUTPUT>");
+    public Argument getExpectedArgument(FunctionDefinition functionDefinition, 
int argumentPos) {
+        return Argument.ofGroup("OUTPUT");
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java
index 5a79875d0e8..26799805aa1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java
@@ -59,12 +59,11 @@ public final class RootArgumentTypeStrategy implements 
ArgumentTypeStrategy {
         final LogicalType actualType = actualDataType.getLogicalType();
 
         if (Objects.equals(expectedNullability, Boolean.FALSE) && 
actualType.isNullable()) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "Unsupported argument type. Expected nullable type of 
root '%s' but actual type was '%s'.",
-                        expectedRoot, actualType);
-            }
-            return Optional.empty();
+            return callContext.fail(
+                    throwOnFailure,
+                    "Unsupported argument type. Expected nullable type of root 
'%s' but actual type was '%s'.",
+                    expectedRoot,
+                    actualType);
         }
 
         return findDataType(
@@ -75,11 +74,11 @@ public final class RootArgumentTypeStrategy implements 
ArgumentTypeStrategy {
     public Argument getExpectedArgument(FunctionDefinition functionDefinition, 
int argumentPos) {
         // "< ... >" to indicate that this is not a type
         if (Objects.equals(expectedNullability, Boolean.TRUE)) {
-            return Argument.of("<" + expectedRoot + " NULL>");
+            return Argument.ofGroup(expectedRoot + " NULL");
         } else if (Objects.equals(expectedNullability, Boolean.FALSE)) {
-            return Argument.of("<" + expectedRoot + " NOT NULL>");
+            return Argument.ofGroup(expectedRoot + " NOT NULL");
         }
-        return Argument.of("<" + expectedRoot + ">");
+        return Argument.ofGroup(expectedRoot);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index d9bd4fa3f0f..81259ca175d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -83,6 +83,13 @@ public final class SpecificInputTypeStrategies {
                                     
and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
                                     JSON_ARGUMENT));
 
+    /** See {@link ExtractInputTypeStrategy}. */
+    public static final InputTypeStrategy EXTRACT = new 
ExtractInputTypeStrategy();
+
+    /** See {@link TemporalOverlapsInputTypeStrategy}. */
+    public static final InputTypeStrategy TEMPORAL_OVERLAPS =
+            new TemporalOverlapsInputTypeStrategy();
+
     // 
--------------------------------------------------------------------------------------------
     // Strategies composed of other strategies
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
index d48fc8b1aef..046a618c3a8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
@@ -80,6 +80,9 @@ public final class SpecificTypeStrategies {
     public static final TypeStrategy INTERNAL_REPLICATE_ROWS =
             new InternalReplicateRowsTypeStrategy();
 
+    /** See {@link ToTimestampLtzTypeStrategy}. */
+    public static final TypeStrategy TO_TIMESTAMP_LTZ = new 
ToTimestampLtzTypeStrategy();
+
     private SpecificTypeStrategies() {
         // no instantiation
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java
index d8df98e201d..c847042c3a0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SubsequenceInputTypeStrategy.java
@@ -77,12 +77,11 @@ public final class SubsequenceInputTypeStrategy implements 
InputTypeStrategy {
             if (splitDataTypes.isPresent()) {
                 result.addAll(splitDataTypes.get());
             } else {
-                if (throwOnFailure) {
-                    throw callContext.newValidationError(
-                            "Could not infer arguments in range: [%d, %d].",
-                            argumentsSplit.startIndex, 
argumentsSplit.endIndex);
-                }
-                return Optional.empty();
+                return callContext.fail(
+                        throwOnFailure,
+                        "Could not infer arguments in range: [%d, %d].",
+                        argumentsSplit.startIndex,
+                        argumentsSplit.endIndex);
             }
         }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SymbolArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SymbolArgumentTypeStrategy.java
index 9e6b471f8b7..9a68ed7af64 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SymbolArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SymbolArgumentTypeStrategy.java
@@ -27,17 +27,27 @@ import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.Signature;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 
 /** Strategy for a symbol argument of a specific {@link TableSymbol} enum. */
 @Internal
-public class SymbolArgumentTypeStrategy implements ArgumentTypeStrategy {
+public class SymbolArgumentTypeStrategy<T extends Enum<? extends TableSymbol>>
+        implements ArgumentTypeStrategy {
 
-    private final Class<? extends Enum<? extends TableSymbol>> symbolClass;
+    private final Class<T> symbolClass;
+    private final Set<T> allowedVariants;
 
-    public SymbolArgumentTypeStrategy(Class<? extends Enum<? extends 
TableSymbol>> symbolClass) {
+    public SymbolArgumentTypeStrategy(Class<T> symbolClass) {
+        this(symbolClass, new 
HashSet<>(Arrays.asList(symbolClass.getEnumConstants())));
+    }
+
+    public SymbolArgumentTypeStrategy(Class<T> symbolClass, Set<T> 
allowedVariants) {
         this.symbolClass = symbolClass;
+        this.allowedVariants = allowedVariants;
     }
 
     @Override
@@ -46,27 +56,30 @@ public class SymbolArgumentTypeStrategy implements 
ArgumentTypeStrategy {
         final DataType argumentType = 
callContext.getArgumentDataTypes().get(argumentPos);
         if (argumentType.getLogicalType().getTypeRoot() != 
LogicalTypeRoot.SYMBOL
                 || !callContext.isArgumentLiteral(argumentPos)) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "Unsupported argument type. Expected symbol type '%s' 
but actual type was '%s'.",
-                        symbolClass.getSimpleName(), argumentType);
-            } else {
-                return Optional.empty();
-            }
+            return callContext.fail(
+                    throwOnFailure,
+                    "Unsupported argument type. Expected symbol type '%s' but 
actual type was '%s'.",
+                    symbolClass.getSimpleName(),
+                    argumentType);
         }
 
-        if (!callContext.getArgumentValue(argumentPos, 
symbolClass).isPresent()) {
-            if (throwOnFailure) {
-                throw callContext.newValidationError(
-                        "Unsupported argument symbol type. Expected symbol 
'%s' but actual symbol was %s.",
-                        symbolClass.getSimpleName(),
-                        callContext
-                                .getArgumentValue(argumentPos, Enum.class)
-                                .map(e -> "'" + e.getClass().getSimpleName() + 
"'")
-                                .orElse("invalid"));
-            } else {
-                return Optional.empty();
-            }
+        Optional<T> val = callContext.getArgumentValue(argumentPos, 
symbolClass);
+        if (!val.isPresent()) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "Unsupported argument symbol type. Expected symbol '%s' 
but actual symbol was %s.",
+                    symbolClass.getSimpleName(),
+                    callContext
+                            .getArgumentValue(argumentPos, Enum.class)
+                            .map(e -> "'" + e.getClass().getSimpleName() + "'")
+                            .orElse("invalid"));
+        }
+        if (!this.allowedVariants.contains(val.get())) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "Unsupported argument symbol variant. Expected one of the 
following variants %s but actual symbol was %s.",
+                    this.allowedVariants,
+                    val.get());
         }
 
         return Optional.of(argumentType);
@@ -75,7 +88,7 @@ public class SymbolArgumentTypeStrategy implements 
ArgumentTypeStrategy {
     @Override
     public Signature.Argument getExpectedArgument(
             FunctionDefinition functionDefinition, int argumentPos) {
-        return Signature.Argument.of(String.format("<%s>", 
symbolClass.getSimpleName()));
+        return Signature.Argument.ofGroup(symbolClass);
     }
 
     // 
---------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TemporalOverlapsInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TemporalOverlapsInputTypeStrategy.java
new file mode 100644
index 00000000000..92bda0ca15c
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TemporalOverlapsInputTypeStrategy.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Type strategy of {@code TO_TIMESTAMP_LTZ}. */
+@Internal
+class TemporalOverlapsInputTypeStrategy implements InputTypeStrategy {
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.of(4);
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            CallContext callContext, boolean throwOnFailure) {
+        final List<DataType> args = callContext.getArgumentDataTypes();
+        final LogicalType leftTimePoint = args.get(0).getLogicalType();
+        final LogicalType leftTemporal = args.get(1).getLogicalType();
+        final LogicalType rightTimePoint = args.get(2).getLogicalType();
+        final LogicalType rightTemporal = args.get(3).getLogicalType();
+
+        if (!leftTimePoint.is(LogicalTypeFamily.DATETIME)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "TEMPORAL_OVERLAPS requires 1st argument 'leftTimePoint' 
to be a DATETIME type, but is %s",
+                    leftTimePoint);
+        }
+        if (!rightTimePoint.is(LogicalTypeFamily.DATETIME)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "TEMPORAL_OVERLAPS requires 3rd argument 'rightTimePoint' 
to be a DATETIME type, but is %s",
+                    rightTimePoint);
+        }
+
+        if (!leftTimePoint.equals(rightTimePoint)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "TEMPORAL_OVERLAPS requires 'leftTimePoint' and 
'rightTimePoint' arguments to be of the same type, but is %s != %s",
+                    leftTimePoint,
+                    rightTimePoint);
+        }
+
+        // leftTemporal is point, then it must be comparable with leftTimePoint
+        if (leftTemporal.is(LogicalTypeFamily.DATETIME)) {
+            if (!leftTemporal.equals(leftTimePoint)) {
+                return callContext.fail(
+                        throwOnFailure,
+                        "TEMPORAL_OVERLAPS requires 'leftTemporal' and 
'leftTimePoint' arguments to be of the same type if 'leftTemporal' is a 
DATETIME, but is %s != %s",
+                        leftTemporal,
+                        leftTimePoint);
+            }
+        } else if (!leftTemporal.is(LogicalTypeFamily.INTERVAL)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "TEMPORAL_OVERLAPS requires 2nd argument 'leftTemporal' to 
be DATETIME or INTERVAL type, but is %s",
+                    leftTemporal);
+        }
+
+        // rightTemporal is point, then it must be comparable with 
rightTimePoint
+        if (rightTemporal.is(LogicalTypeFamily.DATETIME)) {
+            if (!rightTemporal.equals(rightTimePoint)) {
+                return callContext.fail(
+                        throwOnFailure,
+                        "TEMPORAL_OVERLAPS requires 'rightTemporal' and 
'rightTimePoint' arguments to be of the same type if 'rightTemporal' is a 
DATETIME, but is %s != %s",
+                        rightTemporal,
+                        rightTimePoint);
+            }
+        } else if (!rightTemporal.is(LogicalTypeFamily.INTERVAL)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "TEMPORAL_OVERLAPS requires 4th argument 'rightTemporal' 
to be DATETIME or INTERVAL type, but is %s",
+                    rightTemporal);
+        }
+
+        return Optional.of(args);
+    }
+
+    @Override
+    public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
+        return Collections.singletonList(
+                Signature.of(
+                        Argument.ofGroup("leftTimePoint", 
LogicalTypeFamily.DATETIME),
+                        Argument.ofGroup("leftTemporal", "TEMPORAL"),
+                        Argument.ofGroup("rightTimePoint", 
LogicalTypeFamily.DATETIME),
+                        Argument.ofGroup("rightTemporal", "TEMPORAL")));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
similarity index 68%
rename from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
index f186d7fec5f..722dd63e51d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UseArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
@@ -19,31 +19,23 @@
 package org.apache.flink.table.types.inference.strategies;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeStrategy;
-import org.apache.flink.util.Preconditions;
 
-import java.util.List;
 import java.util.Optional;
 
-/** Type strategy that returns the n-th input argument. */
+/** Type strategy of {@code TO_TIMESTAMP_LTZ}. */
 @Internal
-public final class UseArgumentTypeStrategy implements TypeStrategy {
-
-    private final int pos;
-
-    public UseArgumentTypeStrategy(int pos) {
-        Preconditions.checkArgument(pos >= 0);
-        this.pos = pos;
-    }
+public class ToTimestampLtzTypeStrategy implements TypeStrategy {
 
     @Override
     public Optional<DataType> inferType(CallContext callContext) {
-        final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
-        if (pos >= argumentDataTypes.size()) {
-            return Optional.empty();
+        if (callContext.isArgumentLiteral(1)) {
+            final int precision = callContext.getArgumentValue(1, 
Integer.class).get();
+            return Optional.of(DataTypes.TIMESTAMP_LTZ(precision));
         }
-        return Optional.of(argumentDataTypes.get(pos));
+        return Optional.of(DataTypes.TIMESTAMP_LTZ(3));
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TypeLiteralArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TypeLiteralArgumentTypeStrategy.java
index e1d74a2ebe5..a5b09261eb5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TypeLiteralArgumentTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/TypeLiteralArgumentTypeStrategy.java
@@ -46,6 +46,6 @@ public class TypeLiteralArgumentTypeStrategy implements 
ArgumentTypeStrategy {
     @Override
     public Signature.Argument getExpectedArgument(
             FunctionDefinition functionDefinition, int argumentPos) {
-        return Signature.Argument.of("<DATA TYPE NOT NULL>");
+        return Signature.Argument.ofGroup("DATA TYPE NOT NULL");
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/VaryingSequenceInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/VaryingSequenceInputTypeStrategy.java
index 2755bf1d7a8..37b909015be 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/VaryingSequenceInputTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/VaryingSequenceInputTypeStrategy.java
@@ -102,9 +102,9 @@ public final class VaryingSequenceInputTypeStrategy 
implements InputTypeStrategy
         final Signature.Argument newArg;
         final String type = varyingArgument.getType();
         if (argumentNames == null) {
-            newArg = Signature.Argument.of(type + "...");
+            newArg = Signature.Argument.ofVarying(type);
         } else {
-            newArg = 
Signature.Argument.of(argumentNames.get(constantArgumentCount), type + "...");
+            newArg = 
Signature.Argument.ofVarying(argumentNames.get(constantArgumentCount), type);
         }
 
         final List<Signature.Argument> arguments = new ArrayList<>();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
index 008a05a0ae1..a79fefacbde 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -95,6 +95,16 @@ public abstract class LogicalType implements Serializable {
         return Arrays.stream(typeRoots).anyMatch(tr -> this.typeRoot == tr);
     }
 
+    /**
+     * Returns whether the root of the type is part of at least one family of 
the {@code typeFamily}
+     * or not.
+     *
+     * @param typeFamilies The families to check against for equality
+     */
+    public boolean isAnyOf(LogicalTypeFamily... typeFamilies) {
+        return Arrays.stream(typeFamilies).anyMatch(tf -> 
this.typeRoot.getFamilies().contains(tf));
+    }
+
     /**
      * Returns whether the family type of the type equals to the {@code 
family} or not.
      *
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/ExtractFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
similarity index 65%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/ExtractFunctionITCase.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
index deaebadd8be..28434e009a5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/ExtractFunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
@@ -18,26 +18,41 @@
 
 package org.apache.flink.table.planner.functions;
 
+import org.apache.flink.table.api.JsonExistsOnError;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 
+import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.stream.Stream;
 
 import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
 import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.HOUR;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.SECOND;
+import static org.apache.flink.table.api.DataTypes.TIME;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.temporalOverlaps;
 
-/** Test {@link BuiltInFunctionDefinitions#EXTRACT} and its return type. */
-class ExtractFunctionITCase extends BuiltInFunctionTestBase {
+/** Test time-related built-in functions. */
+class TimeFunctionsITCase extends BuiltInFunctionTestBase {
 
     @Override
     Stream<TestSetSpec> getTestSetSpecs() {
+        return Stream.concat(extractTestCases(), temporalOverlapsTestCases());
+    }
+
+    private Stream<TestSetSpec> extractTestCases() {
         return Stream.of(
                 TestSetSpec.forFunction(BuiltInFunctionDefinitions.EXTRACT)
                         .onFieldsWithData(
@@ -45,9 +60,15 @@ class ExtractFunctionITCase extends BuiltInFunctionTestBase {
                                 LocalDateTime.of(2020, 2, 29, 1, 56, 59, 
987654321),
                                 null,
                                 LocalDate.of(1990, 10, 14),
-                                Instant.ofEpochMilli(100000012))
+                                Instant.ofEpochMilli(100000012),
+                                true)
                         .andDataTypes(
-                                TIMESTAMP(), TIMESTAMP(), TIMESTAMP(), DATE(), 
TIMESTAMP_LTZ(3))
+                                TIMESTAMP(),
+                                TIMESTAMP(),
+                                TIMESTAMP(),
+                                DATE(),
+                                TIMESTAMP_LTZ(3),
+                                BOOLEAN())
                         .testResult(
                                 $("f0").extract(TimeIntervalUnit.NANOSECOND),
                                 "EXTRACT(NANOSECOND FROM f0)",
@@ -65,7 +86,11 @@ class ExtractFunctionITCase extends BuiltInFunctionTestBase {
                                 BIGINT().nullable())
                         .testSqlValidationError(
                                 "EXTRACT(NANOSECOND FROM f3)", "NANOSECOND can 
not be applied")
-                        .testSqlResult("EXTRACT(NANOSECOND FROM f4)", 
12000000L, BIGINT())
+                        .testResult(
+                                $("f4").extract(TimeIntervalUnit.NANOSECOND),
+                                "EXTRACT(NANOSECOND FROM f4)",
+                                12000000L,
+                                BIGINT())
                         .testResult(
                                 $("f0").extract(TimeIntervalUnit.MICROSECOND),
                                 "EXTRACT(MICROSECOND FROM f0)",
@@ -96,8 +121,11 @@ class ExtractFunctionITCase extends BuiltInFunctionTestBase 
{
                                 "EXTRACT(MILLISECOND FROM f2)",
                                 null,
                                 BIGINT().nullable())
-                        // Table API does not support this yet (see 
FLINK-13785)
-                        .testSqlResult("EXTRACT(MILLISECOND FROM f4)", 12L, 
BIGINT().nullable())
+                        .testResult(
+                                $("f4").extract(TimeIntervalUnit.MILLISECOND),
+                                "EXTRACT(MILLISECOND FROM f4)",
+                                12L,
+                                BIGINT().nullable())
                         .testResult(
                                 $("f0").extract(TimeIntervalUnit.SECOND),
                                 "EXTRACT(SECOND FROM f0)",
@@ -294,6 +322,107 @@ class ExtractFunctionITCase extends 
BuiltInFunctionTestBase {
                                 call("EXTRACT", TimeIntervalUnit.EPOCH, 
$("f2")),
                                 "EXTRACT(EPOCH FROM f2)",
                                 null,
-                                BIGINT().nullable()));
+                                BIGINT().nullable())
+                        .testTableApiValidationError(
+                                call("EXTRACT", TimeIntervalUnit.EPOCH, 
$("f5")),
+                                "EXTRACT requires 2nd argument to be a 
temporal type, but type is BOOLEAN")
+                        .testTableApiValidationError(
+                                call("EXTRACT", JsonExistsOnError.ERROR, 
$("f2")),
+                                "EXTRACT requires 1st argument to be a 
TimeIntervalUnit literal"));
+    }
+
+    private Stream<TestSetSpec> temporalOverlapsTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS)
+                        .onFieldsWithData(
+                                LocalTime.of(2, 55, 0),
+                                Duration.ofHours(1),
+                                LocalTime.of(3, 30, 0),
+                                Duration.ofHours(2))
+                        .andDataTypes(TIME(), INTERVAL(HOUR()), TIME(), 
INTERVAL(HOUR()))
+                        .testResult(
+                                temporalOverlaps($("f0"), $("f1"), $("f2"), 
$("f3")),
+                                "(f0, f1) OVERLAPS (f2, f3)",
+                                true,
+                                BOOLEAN()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS)
+                        .onFieldsWithData(
+                                LocalTime.of(9, 0, 0),
+                                LocalTime.of(9, 30, 0),
+                                LocalTime.of(9, 29, 0),
+                                LocalTime.of(9, 31, 0),
+                                LocalTime.of(10, 0, 0),
+                                LocalTime.of(10, 15, 0),
+                                Duration.ofHours(3))
+                        .andDataTypes(
+                                TIME(), TIME(), TIME(), TIME(), TIME(), 
TIME(), INTERVAL(HOUR()))
+                        .testResult(
+                                temporalOverlaps($("f0"), $("f1"), $("f2"), 
$("f3")),
+                                "(f0, f1) OVERLAPS (f2, f3)",
+                                true,
+                                BOOLEAN())
+                        .testResult(
+                                temporalOverlaps($("f0"), $("f4"), $("f5"), 
$("f6")),
+                                "(f0, f4) OVERLAPS (f5, f6)",
+                                false,
+                                BOOLEAN()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS)
+                        .onFieldsWithData(
+                                LocalDate.of(2011, 3, 10),
+                                Duration.ofDays(10),
+                                LocalDate.of(2011, 3, 19),
+                                Duration.ofDays(10))
+                        .andDataTypes(DATE(), INTERVAL(DAY()), DATE(), 
INTERVAL(DAY()))
+                        .testResult(
+                                temporalOverlaps($("f0"), $("f1"), $("f2"), 
$("f3")),
+                                "(f0, f1) OVERLAPS (f2, f3)",
+                                true,
+                                BOOLEAN()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS)
+                        .onFieldsWithData(
+                                LocalDateTime.of(2011, 3, 10, 5, 2, 2),
+                                Duration.ofSeconds(0),
+                                LocalDateTime.of(2011, 3, 10, 5, 2, 2),
+                                LocalDateTime.of(2011, 3, 10, 5, 2, 1),
+                                LocalDateTime.of(2011, 3, 10, 5, 2, 2, 
1000000),
+                                LocalDateTime.of(2011, 3, 10, 5, 2, 2, 
2000000))
+                        .andDataTypes(
+                                TIMESTAMP(),
+                                INTERVAL(SECOND()),
+                                TIMESTAMP(),
+                                TIMESTAMP(),
+                                TIMESTAMP(),
+                                TIMESTAMP())
+                        .testResult(
+                                temporalOverlaps($("f0"), $("f1"), $("f2"), 
$("f3")),
+                                "(f0, f1) OVERLAPS (f2, f3)",
+                                true,
+                                BOOLEAN())
+                        .testResult(
+                                temporalOverlaps($("f4"), $("f1"), $("f5"), 
$("f5")),
+                                "(f4, f1) OVERLAPS (f5, f5)",
+                                false,
+                                BOOLEAN()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS)
+                        .onFieldsWithData(
+                                1,
+                                LocalDateTime.of(2011, 3, 10, 5, 2, 2),
+                                LocalDate.of(2011, 3, 10))
+                        .andDataTypes(INT(), TIMESTAMP(), DATE())
+                        .testTableApiValidationError(
+                                temporalOverlaps($("f0"), $("f1"), $("f1"), 
$("f1")),
+                                "TEMPORAL_OVERLAPS requires 1st argument 
'leftTimePoint' to be a DATETIME type, but is INT")
+                        .testTableApiValidationError(
+                                temporalOverlaps($("f1"), $("f1"), $("f0"), 
$("f1")),
+                                "TEMPORAL_OVERLAPS requires 3rd argument 
'rightTimePoint' to be a DATETIME type, but is INT")
+                        .testTableApiValidationError(
+                                temporalOverlaps($("f1"), $("f1"), $("f2"), 
$("f2")),
+                                "TEMPORAL_OVERLAPS requires 'leftTimePoint' 
and 'rightTimePoint' arguments to be of the same type, but is TIMESTAMP(6) != 
DATE")
+                        .testTableApiValidationError(
+                                temporalOverlaps($("f2"), $("f1"), $("f2"), 
$("f2")),
+                                "TEMPORAL_OVERLAPS requires 'leftTemporal' and 
'leftTimePoint' arguments to be of the same type if 'leftTemporal' is a 
DATETIME, but is TIMESTAMP(6) != DATE")
+                        .testTableApiValidationError(
+                                temporalOverlaps($("f1"), $("f0"), $("f1"), 
$("f0")),
+                                "TEMPORAL_OVERLAPS requires 2nd argument 
'leftTemporal' to be DATETIME or INTERVAL type, but is INT"));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 93d9e186b00..953c5612446 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -240,9 +240,6 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("FLOOR(TIME '12:44:31' TO MINUTE)", "12:44:00")
     testSqlApi("CEIL(TIME '12:44:31' TO MINUTE)", "12:45:00")
     testSqlApi("QUARTER(DATE '2016-04-12')", "2")
-    testSqlApi(
-      "(TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL 
'2' HOUR)",
-      "TRUE")
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 68fcf093fd5..4761ef7c0a4 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -1272,8 +1272,8 @@ class TemporalTypesTest extends ExpressionTestBase {
     )
     testExpectedTableApiException(
       toTimestampLtz("test_string_type", 0),
-      "toTimestampLtz(test_string_type, 0) requires numeric type for the first 
input," +
-        " but the actual type 'String'."
+      "Unsupported argument type. " +
+        "Expected type of family 'NUMERIC' but actual type was 'CHAR(16) NOT 
NULL'"
     )
 
     // invalid type for the second input
@@ -1286,8 +1286,8 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testExpectedTableApiException(
       toTimestampLtz(123, "test_string_type"),
-      "toTimestampLtz(123, test_string_type) requires numeric type for the 
second input," +
-        " but the actual type 'String'."
+      "Unsupported argument type. " +
+        "Expected type of family 'INTEGER_NUMERIC' but actual type was 
'CHAR(16) NOT NULL'"
     )
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index b5ddd2dad00..a5105607f83 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.{MapFunction, 
RichFunction, RichMap
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.testutils.FlinkAssertions
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api
 import org.apache.flink.table.api.{EnvironmentSettings, TableException, 
ValidationException}
@@ -49,6 +50,7 @@ import org.apache.calcite.rel.logical.LogicalCalc
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
+import org.assertj.core.api.Assertions.assertThatThrownBy
 import org.junit.{After, Before, Rule}
 import org.junit.Assert.{assertEquals, assertTrue, fail}
 import org.junit.rules.ExpectedException
@@ -153,23 +155,18 @@ abstract class ExpressionTestBase {
 
     invalidTableApiExprs.foreach {
       case (tableExpr, keywords, clazz) => {
-        try {
-          val invalidExprs = mutable.ArrayBuffer[(String, RexNode, String)]()
-          addTableApiTestExpr(tableExpr, keywords, invalidExprs, clazz)
-          evaluateGivenExprs(invalidExprs)
-          fail(s"Expected a $clazz, but no exception is thrown.")
-        } catch {
-          case e if e.getClass == clazz =>
-            if (keywords != null) {
-              assertTrue(
-                s"The actual exception message \n${e.getMessage}\n" +
-                  s"doesn't contain expected keyword \n$keywords\n",
-                e.getMessage.contains(keywords))
-            }
-          case e: Throwable =>
-            e.printStackTrace()
-            fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
+        val assertion = if (keywords != null) {
+          FlinkAssertions.anyCauseMatches(clazz, keywords)
+        } else {
+          FlinkAssertions.anyCauseMatches(clazz)
         }
+
+        assertThatThrownBy(
+          () => {
+            val invalidExprs = mutable.ArrayBuffer[(String, RexNode, String)]()
+            addTableApiTestExpr(tableExpr, keywords, invalidExprs, clazz)
+            evaluateGivenExprs(invalidExprs)
+          }).satisfies(assertion)
       }
     }
   }

Reply via email to