yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1905955530


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java:
##########
@@ -20,22 +20,79 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
+import java.util.List;
 import java.util.Optional;
 
 /** Type strategy of {@code TO_TIMESTAMP_LTZ}. */
 @Internal
 public class ToTimestampLtzTypeStrategy implements TypeStrategy {
 
+    private static final int DEFAULT_PRECISION = 3;
+
     @Override
     public Optional<DataType> inferType(CallContext callContext) {
-        if (callContext.isArgumentLiteral(1)) {
-            final int precision = callContext.getArgumentValue(1, 
Integer.class).get();
-            return Optional.of(DataTypes.TIMESTAMP_LTZ(precision));
+        List<DataType> argumentTypes = callContext.getArgumentDataTypes();
+        int argCount = argumentTypes.size();
+
+        if (argCount < 1 || argCount > 3) {
+            throw new ValidationException(
+                    "Unsupported argument type. "
+                            + "TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but 
"
+                            + argCount
+                            + " were provided.");
+        }
+
+        LogicalType firstType = argumentTypes.get(0).getLogicalType();
+        LogicalTypeRoot firstTypeRoot = firstType.getTypeRoot();
+
+        if (argCount == 1) {
+            if (!isCharacterType(firstTypeRoot) && 
!firstType.is(LogicalTypeFamily.NUMERIC)) {
+                throw new ValidationException(
+                        "Unsupported argument type. "
+                                + "When taking 1 argument, TO_TIMESTAMP_LTZ 
accepts <CHARACTER> or <NUMERIC>.");
+            }
+        } else if (argCount == 2) {
+            LogicalType secondType = argumentTypes.get(1).getLogicalType();
+            LogicalTypeRoot secondTypeRoot = secondType.getTypeRoot();
+            if (firstType.is(LogicalTypeFamily.NUMERIC)) {
+                if (secondTypeRoot != LogicalTypeRoot.INTEGER) {
+                    throw new ValidationException(
+                            "Unsupported argument type. "
+                                    + "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) 
requires the second argument to be <INTEGER>.");
+                }
+            } else if (isCharacterType(firstTypeRoot)) {
+                if (!isCharacterType(secondTypeRoot)) {
+                    throw new ValidationException(
+                            "Unsupported argument type. "
+                                    + "TO_TIMESTAMP_LTZ(<CHARACTER>, 
<CHARACTER>) requires the second argument to be <CHARACTER>.");
+                }
+            } else {
+                throw new ValidationException(
+                        "Unsupported argument type. "
+                                + "When taking 2 arguments, TO_TIMESTAMP_LTZ 
requires the first argument to be <NUMERIC> or <CHARACTER>.");
+            }
+        } else if (argCount == 3) {
+            if (!isCharacterType(firstTypeRoot)
+                    || 
!isCharacterType(argumentTypes.get(1).getLogicalType().getTypeRoot())
+                    || 
!isCharacterType(argumentTypes.get(2).getLogicalType().getTypeRoot())) {
+                throw new ValidationException(
+                        "Unsupported argument type. "
+                                + "When taking 3 arguments, TO_TIMESTAMP_LTZ 
requires all three arguments to be of type <CHARACTER>.");

Review Comment:
   Addressed, also modified the unit tests to reflect the change. 



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java:
##########
@@ -366,6 +366,61 @@ public static ApiExpression toTimestampLtz(Object 
numericEpochTime, Object preci
         return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, 
numericEpochTime, precision);
     }
 
+    /**
+     * Converts the given time string with the specified format to {@link
+     * DataTypes#TIMESTAMP_LTZ(int)}.
+     *
+     * @param timestampStr The timestamp string to convert.
+     * @param format The format of the string.
+     * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
+     */
+    public static ApiExpression toTimestampLtz(String timestampStr, String 
format) {
+        return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, 
timestampStr, format);
+    }
+
+    /**
+     * Converts a timestamp to {@link DataTypes#TIMESTAMP_LTZ(int)}.
+     *
+     * <p>This method takes an object representing a timestamp and converts it 
to a TIMESTAMP_LTZ

Review Comment:
   Addressed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to