This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 25493ac2a95 [FLINK-39244][table] Support precision 0-9 for 
TO_TIMESTAMP_LTZ function
25493ac2a95 is described below

commit 25493ac2a95b52421631293705c24fceb40b51f4
Author: Ramin Gharib <[email protected]>
AuthorDate: Thu Mar 19 15:02:23 2026 +0100

    [FLINK-39244][table] Support precision 0-9 for TO_TIMESTAMP_LTZ function
    
    This closes #27757.
---
 docs/data/sql_functions.yml                        |  35 +++++-
 docs/data/sql_functions_zh.yml                     |  35 +++++-
 flink-python/pyflink/table/expressions.py          |  60 +++++----
 .../table/expressions/ValueLiteralExpression.java  |  52 +++++++-
 .../strategies/ToTimestampLtzTypeStrategy.java     |  96 ++++++++++-----
 .../apache/flink/table/utils/DateTimeUtils.java    | 137 +++++++++++----------
 .../flink/table/expressions/ExpressionTest.java    |  53 ++++++++
 .../strategies/ToTimestampLtzTypeStrategyTest.java |  76 +++++++++++-
 .../LiteralExpressionsSerializationITCase.java     |   9 ++
 .../planner/functions/TimeFunctionsITCase.java     |  33 +++--
 .../planner/expressions/TemporalTypesTest.scala    |  18 ---
 .../functions/scalar/ToTimestampLtzFunction.java   |  11 +-
 12 files changed, 458 insertions(+), 157 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 0be8e6c9ca2..f8f3e199e93 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -686,11 +686,40 @@ temporal:
     table: toDate(STRING1[, STRING2])
     description: Converts a date string string1 with format string2 (by 
default 'yyyy-MM-dd') to a date.
   - sql: TO_TIMESTAMP_LTZ(numeric[, precision])
-    table: toTimestampLtz(NUMERIC, PRECISION)
-    description: Converts an epoch seconds or epoch milliseconds to a 
TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents 
TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents 
TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the 
default precision is 3. If any input is null, the function will return null.
+    table: toTimestampLtz(NUMERIC[, PRECISION])
+    description: |
+      Converts a numeric epoch value to a TIMESTAMP_LTZ.
+
+      - numeric: the epoch value to convert. The meaning of this value depends 
on the precision parameter.
+      - precision: an integer (0-9) that determines the unit of the numeric 
value (default 3). The precision changes how the numeric input is interpreted:
+
+        - TO_TIMESTAMP_LTZ(epochSeconds, 0) interprets the value as seconds 
since epoch
+        - TO_TIMESTAMP_LTZ(epochMillis, 3) interprets the value as 
milliseconds since epoch
+        - TO_TIMESTAMP_LTZ(epochMicros, 6) interprets the value as 
microseconds since epoch
+        - TO_TIMESTAMP_LTZ(epochNanos, 9) interprets the value as nanoseconds 
since epoch
+
+      Other precision values between 0 and 9 are also supported, where the 
numeric value represents units of 10^(-precision) seconds.
+      The output type is TIMESTAMP_LTZ(3) for precision 0-3, and 
TIMESTAMP_LTZ(precision) for precision 4-9.
+      Returns NULL if any input is NULL.
+
+      E.g., TO_TIMESTAMP_LTZ(1234567890, 0) returns TIMESTAMP_LTZ(3) 
'2009-02-13 23:31:30.000',
+      TO_TIMESTAMP_LTZ(1234567890123, 3) returns TIMESTAMP_LTZ(3) '2009-02-13 
23:31:30.123',
+      TO_TIMESTAMP_LTZ(1234567890123456789, 9) returns TIMESTAMP_LTZ(9) 
'2009-02-13 23:31:30.123456789'.
   - sql: TO_TIMESTAMP_LTZ(string1[, string2[, string3]])
     table: toTimestampLtz(STRING1[, STRING2[, STRING3]])
-    description: Converts a timestamp string string1 with format string2 (by 
default 'yyyy-MM-dd HH:mm:ss.SSS') in time zone string3 (by default 'UTC') to a 
TIMESTAMP_LTZ. If any input is null, the function will return null.
+    description: |
+      Converts a timestamp string to a TIMESTAMP_LTZ.
+
+      - string1: the timestamp string to parse
+      - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The 
pattern follows Java's DateTimeFormatter syntax, where 'S' represents 
fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds).
+      - string3: the time zone of the input string (default 'UTC'). Supports 
zone IDs such as 'UTC', 'Asia/Shanghai', or 'America/Los_Angeles'.
+
+      The output precision is inferred from the number of 'S' characters in 
the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' 
returns TIMESTAMP_LTZ(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns 
TIMESTAMP_LTZ(6).
+      Returns NULL if any input is NULL.
+
+      E.g., TO_TIMESTAMP_LTZ('2023-01-01 00:00:00') parses using default 
format and UTC,
+      TO_TIMESTAMP_LTZ('2023-01-01 12:30:00.123456789', 'yyyy-MM-dd 
HH:mm:ss.SSSSSSSSS') parses with nanosecond precision,
+      TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 
'Asia/Shanghai') parses in Shanghai time zone.
   - sql: TO_TIMESTAMP(string1[, string2])
     table: toTimestamp(STRING1[, STRING2])
     description: "Converts date time string string1 with format string2 (by 
default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone."
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 1c70ff12d11..35a546f6d2e 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -812,11 +812,40 @@ temporal:
     table: toDate(STRING1[, STRING2])
     description: 将格式为 string2(默认为 'yyyy-MM-dd')的字符串 string1 转换为日期。
   - sql: TO_TIMESTAMP_LTZ(numeric[, precision])
-    table: toTimestampLtz(NUMERIC, PRECISION)
-    description: Converts an epoch seconds or epoch milliseconds to a 
TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents 
TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents 
TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the 
default precision is 3. If any input is null, the function will return null.
+    table: toTimestampLtz(NUMERIC[, PRECISION])
+    description: |
+      Converts a numeric epoch value to a TIMESTAMP_LTZ.
+
+      - numeric: the epoch value to convert. The meaning of this value depends 
on the precision parameter.
+      - precision: an integer (0-9) that determines the unit of the numeric 
value (default 3). The precision changes how the numeric input is interpreted:
+
+        - TO_TIMESTAMP_LTZ(epochSeconds, 0) interprets the value as seconds 
since epoch
+        - TO_TIMESTAMP_LTZ(epochMillis, 3) interprets the value as 
milliseconds since epoch
+        - TO_TIMESTAMP_LTZ(epochMicros, 6) interprets the value as 
microseconds since epoch
+        - TO_TIMESTAMP_LTZ(epochNanos, 9) interprets the value as nanoseconds 
since epoch
+
+      Other precision values between 0 and 9 are also supported, where the 
numeric value represents units of 10^(-precision) seconds.
+      The output type is TIMESTAMP_LTZ(3) for precision 0-3, and 
TIMESTAMP_LTZ(precision) for precision 4-9.
+      Returns NULL if any input is NULL.
+
+      E.g., TO_TIMESTAMP_LTZ(1234567890, 0) returns TIMESTAMP_LTZ(3) 
'2009-02-13 23:31:30.000',
+      TO_TIMESTAMP_LTZ(1234567890123, 3) returns TIMESTAMP_LTZ(3) '2009-02-13 
23:31:30.123',
+      TO_TIMESTAMP_LTZ(1234567890123456789, 9) returns TIMESTAMP_LTZ(9) 
'2009-02-13 23:31:30.123456789'.
   - sql: TO_TIMESTAMP_LTZ(string1[, string2[, string3]])
     table: toTimestampLtz(STRING1[, STRING2[, STRING3]])
-    description: Converts a timestamp string string1 with format string2 (by 
default 'yyyy-MM-dd HH:mm:ss.SSS') in time zone string3 (by default 'UTC') to a 
TIMESTAMP_LTZ. If any input is null, the function will return null.
+    description: |
+      Converts a timestamp string to a TIMESTAMP_LTZ.
+
+      - string1: the timestamp string to parse
+      - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The 
pattern follows Java's DateTimeFormatter syntax, where 'S' represents 
fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds).
+      - string3: the time zone of the input string (default 'UTC'). Supports 
zone IDs such as 'UTC', 'Asia/Shanghai', or 'America/Los_Angeles'.
+
+      The output precision is inferred from the number of 'S' characters in 
the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' 
returns TIMESTAMP_LTZ(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns 
TIMESTAMP_LTZ(6).
+      Returns NULL if any input is NULL.
+
+      E.g., TO_TIMESTAMP_LTZ('2023-01-01 00:00:00') parses using default 
format and UTC,
+      TO_TIMESTAMP_LTZ('2023-01-01 12:30:00.123456789', 'yyyy-MM-dd 
HH:mm:ss.SSSSSSSSS') parses with nanosecond precision,
+      TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 
'Asia/Shanghai') parses in Shanghai time zone.
   - sql: TO_TIMESTAMP(string1[, string2])
     table: toTimestamp(STRING1[, STRING2])
     description: 将格式为 string2(默认为:'yyyy-MM-dd HH:mm:ss')的字符串 string1 转换为 
timestamp,不带时区。
diff --git a/flink-python/pyflink/table/expressions.py 
b/flink-python/pyflink/table/expressions.py
index 00d8cec5f4a..daf73ae0c21 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -325,34 +325,52 @@ def to_timestamp(timestamp_str: Union[str, 
Expression[str]],
 @PublicEvolving()
 def to_timestamp_ltz(*args) -> Expression:
     """
-    Converts a value to a timestamp with local time zone.
-
-    Supported functions:
-    1. to_timestamp_ltz(Numeric) -> DataTypes.TIMESTAMP_LTZ
-    Converts a numeric value of epoch milliseconds to a TIMESTAMP_LTZ. The 
default precision is 3.
-    2. to_timestamp_ltz(Numeric, Integer) -> DataTypes.TIMESTAMP_LTZ
-    Converts a numeric value of epoch seconds or epoch milliseconds to a 
TIMESTAMP_LTZ.
-    Valid precisions are 0 or 3.
-    3. to_timestamp_ltz(String) -> DataTypes.TIMESTAMP_LTZ
-    Converts a timestamp string using default format 'yyyy-MM-dd HH:mm:ss.SSS' 
to a TIMESTAMP_LTZ.
-    4. to_timestamp_ltz(String, String) -> DataTypes.TIMESTAMP_LTZ
-    Converts a timestamp string using format (default 'yyyy-MM-dd 
HH:mm:ss.SSS') to a TIMESTAMP_LTZ.
-    5. to_timestamp_ltz(String, String, String) -> DataTypes.TIMESTAMP_LTZ
-    Converts a timestamp string string1 using format string2 (default 
'yyyy-MM-dd HH:mm:ss.SSS')
-    in time zone string3 (default 'UTC') to a TIMESTAMP_LTZ.
-    Supports any timezone that is available in Java's TimeZone database.
+    Converts a value to a TIMESTAMP_LTZ (timestamp with local time zone).
+
+    Supported signatures:
+
+    1. to_timestamp_ltz(numeric) -> TIMESTAMP_LTZ(3)
+       Converts a numeric epoch value to a TIMESTAMP_LTZ. The value is 
interpreted as
+       milliseconds since epoch (default precision 3).
+
+    2. to_timestamp_ltz(numeric, precision) -> TIMESTAMP_LTZ(max(precision, 3))
+       Converts a numeric epoch value to a TIMESTAMP_LTZ. The precision 
parameter (0-9)
+       determines the unit of the numeric value:
+
+       - precision 0: the numeric value represents seconds since epoch
+       - precision 3: the numeric value represents milliseconds since epoch
+       - precision 6: the numeric value represents microseconds since epoch
+       - precision 9: the numeric value represents nanoseconds since epoch
+
+       Other precision values between 0 and 9 are also supported, where the 
numeric
+       value represents units of 10^(-precision) seconds. The output type is
+       TIMESTAMP_LTZ(3) for precision 0-3, and TIMESTAMP_LTZ(precision) for 
4-9.
+
+    3. to_timestamp_ltz(string) -> TIMESTAMP_LTZ(3)
+       Parses a timestamp string using default format 'yyyy-MM-dd HH:mm:ss'.
+
+    4. to_timestamp_ltz(string, format) -> 
TIMESTAMP_LTZ(max(fractional_digits, 3))
+       Parses a timestamp string using the given format pattern. The output 
precision
+       is inferred from the number of 'S' characters in the format (e.g., 
'SSS' -> 3,
+       'SSSSSS' -> 6, 'SSSSSSSSS' -> 9), with a minimum of 3.
+
+    5. to_timestamp_ltz(string, format, timezone) -> 
TIMESTAMP_LTZ(max(fractional_digits, 3))
+       Parses a timestamp string using the given format pattern in the 
specified time zone.
+       The output precision is inferred from the format pattern as in 
signature 4.
+
+    Returns NULL if any input is NULL.
 
     Example:
     ::
 
-        >>> table.select(to_timestamp_ltz(100))  # numeric with default 
precision
-        >>> table.select(to_timestamp_ltz(100, 0))  # numeric with second 
precision
-        >>> table.select(to_timestamp_ltz(100, 3))  # numeric with millisecond 
precision
+        >>> table.select(to_timestamp_ltz(1234567890, 0))   # epoch seconds
+        >>> table.select(to_timestamp_ltz(1234567890123, 3)) # epoch 
milliseconds
+        >>> table.select(to_timestamp_ltz(1234567890123456789, 9)) # epoch 
nanoseconds
         >>> table.select(to_timestamp_ltz("2023-01-01 00:00:00"))  # string 
with default format
         >>> table.select(to_timestamp_ltz("01/01/2023", "MM/dd/yyyy"))  # 
string with format
         >>> table.select(to_timestamp_ltz("2023-01-01 00:00:00",
-                                        "yyyy-MM-dd HH:mm:ss",
-                                        "UTC"))  # string with format and 
timezone
+        ...                              "yyyy-MM-dd HH:mm:ss",
+        ...                              "Asia/Shanghai"))  # string with 
format and timezone
     """
     if len(args) == 1:
         return _unary_op("toTimestampLtz", args[0])
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
index 6367dd4b0a2..7a76a36a37c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
@@ -24,10 +24,12 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.utils.ValueDataTypeConverter;
+import org.apache.flink.table.utils.DateTimeUtils;
 import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.types.ColumnList;
 import org.apache.flink.util.Preconditions;
@@ -282,12 +284,9 @@ public final class ValueLiteralExpression implements 
ResolvedExpression {
                         localDateTime.toLocalDate(),
                         
localDateTime.toLocalTime().format(DateTimeFormatter.ISO_LOCAL_TIME));
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                final Instant instant = getValueAs(Instant.class).get();
-                if (instant.getNano() % 1_000_000 != 0) {
-                    throw new TableException(
-                            "Maximum precision for 
TIMESTAMP_WITH_LOCAL_TIME_ZONE literals is '3'");
-                }
-                return String.format("TO_TIMESTAMP_LTZ(%d, %d)", 
instant.toEpochMilli(), 3);
+                return serializeTimestampLtz(
+                        getValueAs(Instant.class).get(),
+                        ((LocalZonedTimestampType) 
dataType.getLogicalType()).getPrecision());
             case INTERVAL_YEAR_MONTH:
                 final Period period = 
getValueAs(Period.class).get().normalized();
                 return String.format(
@@ -429,4 +428,45 @@ public final class ValueLiteralExpression implements 
ResolvedExpression {
         }
         return StringUtils.arrayAwareToString(value);
     }
+
+    /**
+     * Serializes a TIMESTAMP_LTZ literal as a SQL string. Uses the numeric 
variant {@code
+     * TO_TIMESTAMP_LTZ(epoch, precision)} for readability when the value fits 
in a long, and falls
+     * back to the string variant for large values that would overflow.
+     */
+    private static String serializeTimestampLtz(Instant instant, int 
precision) {
+        String toTimestampLtzExpr;
+        if (canRepresentAsLong(instant, precision)) {
+            long epochValue = DateTimeUtils.toEpochValue(instant, precision);
+            toTimestampLtzExpr = String.format("TO_TIMESTAMP_LTZ(%d, %d)", 
epochValue, precision);
+        } else {
+            final LocalDateTime utcDateTime =
+                    LocalDateTime.ofInstant(instant, java.time.ZoneOffset.UTC);
+            final String formatPattern =
+                    precision > 0
+                            ? "yyyy-MM-dd HH:mm:ss." + "S".repeat(precision)
+                            : "yyyy-MM-dd HH:mm:ss";
+            final String timestampStr =
+                    
utcDateTime.format(DateTimeFormatter.ofPattern(formatPattern));
+            toTimestampLtzExpr =
+                    String.format(
+                            "TO_TIMESTAMP_LTZ('%s', '%s', 'UTC')", 
timestampStr, formatPattern);
+        }
+        // For precision 0-2, TO_TIMESTAMP_LTZ returns TIMESTAMP_LTZ(3), so we 
need a CAST
+        // to match the actual data type precision.
+        if (precision < 3) {
+            return String.format("CAST(%s AS TIMESTAMP_LTZ(%d))", 
toTimestampLtzExpr, precision);
+        }
+        return toTimestampLtzExpr;
+    }
+
+    /**
+     * Checks whether an {@link Instant} can be represented as a {@code long} 
epoch value at the
+     * given precision without overflow.
+     */
+    private static boolean canRepresentAsLong(Instant instant, int precision) {
+        long factor = (long) Math.pow(10, precision);
+        long epochSeconds = instant.getEpochSecond();
+        return factor == 0 || Math.abs(epochSeconds) <= Long.MAX_VALUE / 
factor;
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
index fe11e91e432..7481879b670 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.inference.TypeStrategy;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.utils.DateTimeUtils;
 
 import java.util.List;
 import java.util.Optional;
@@ -35,6 +36,8 @@ import java.util.Optional;
 @Internal
 public class ToTimestampLtzTypeStrategy implements TypeStrategy {
 
+    private static final int MIN_PRECISION = 0;
+    private static final int MAX_PRECISION = 9;
     private static final int DEFAULT_PRECISION = 3;
 
     @Override
@@ -52,44 +55,79 @@ public class ToTimestampLtzTypeStrategy implements 
TypeStrategy {
 
         LogicalType firstType = argumentTypes.get(0).getLogicalType();
         LogicalTypeRoot firstTypeRoot = firstType.getTypeRoot();
+        int outputPrecision = DEFAULT_PRECISION;
 
-        if (argCount == 1) {
-            if (!isCharacterType(firstTypeRoot) && 
!firstType.is(LogicalTypeFamily.NUMERIC)) {
-                throw new ValidationException(
-                        "Unsupported argument type. "
-                                + "When taking 1 argument, TO_TIMESTAMP_LTZ 
accepts an argument of type <VARCHAR>, <CHAR>, or <NUMERIC>.");
-            }
-        } else if (argCount == 2) {
-            LogicalType secondType = argumentTypes.get(1).getLogicalType();
-            LogicalTypeRoot secondTypeRoot = secondType.getTypeRoot();
-            if (firstType.is(LogicalTypeFamily.NUMERIC)) {
-                if (secondTypeRoot != LogicalTypeRoot.INTEGER) {
+        switch (argCount) {
+            case 1:
+                if (!isCharacterType(firstTypeRoot) && 
!firstType.is(LogicalTypeFamily.NUMERIC)) {
                     throw new ValidationException(
                             "Unsupported argument type. "
-                                    + "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) 
requires the second argument to be <INTEGER>.");
+                                    + "When taking 1 argument, 
TO_TIMESTAMP_LTZ accepts an argument of type <VARCHAR>, <CHAR>, or <NUMERIC>.");
                 }
-            } else if (isCharacterType(firstTypeRoot)) {
-                if (!isCharacterType(secondTypeRoot)) {
+                break;
+            case 2:
+                LogicalType secondType = argumentTypes.get(1).getLogicalType();
+                LogicalTypeRoot secondTypeRoot = secondType.getTypeRoot();
+                if (firstType.is(LogicalTypeFamily.NUMERIC)) {
+                    if (secondTypeRoot != LogicalTypeRoot.INTEGER) {
+                        throw new ValidationException(
+                                "Unsupported argument type. "
+                                        + "TO_TIMESTAMP_LTZ(<NUMERIC>, 
<INTEGER>) requires the second argument to be <INTEGER>.");
+                    }
+                    Optional<Integer> precisionOpt = 
callContext.getArgumentValue(1, Integer.class);
+                    if (precisionOpt.isPresent()) {
+                        int precision = precisionOpt.get();
+                        validatePrecision(precision);
+                        outputPrecision = Math.max(precision, 
DEFAULT_PRECISION);
+                    }
+                } else if (isCharacterType(firstTypeRoot)) {
+                    if (!isCharacterType(secondTypeRoot)) {
+                        throw new ValidationException(
+                                "Unsupported argument type. "
+                                        + "If the first argument is of type 
<VARCHAR> or <CHAR>, TO_TIMESTAMP_LTZ requires the second argument to be of 
type <VARCHAR> or <CHAR>.");
+                    }
+                    outputPrecision = inferPrecisionFromFormat(callContext);
+                } else {
                     throw new ValidationException(
                             "Unsupported argument type. "
-                                    + "If the first argument is of type 
<VARCHAR> or <CHAR>, TO_TIMESTAMP_LTZ requires the second argument to be of 
type <VARCHAR> or <CHAR>.");
+                                    + "When taking 2 arguments, 
TO_TIMESTAMP_LTZ requires the first argument to be of type <VARCHAR>, <CHAR>, 
or <NUMERIC>.");
                 }
-            } else {
-                throw new ValidationException(
-                        "Unsupported argument type. "
-                                + "When taking 2 arguments, TO_TIMESTAMP_LTZ 
requires the first argument to be of type <VARCHAR>, <CHAR>, or <NUMERIC>.");
-            }
-        } else if (argCount == 3) {
-            if (!isCharacterType(firstTypeRoot)
-                    || 
!isCharacterType(argumentTypes.get(1).getLogicalType().getTypeRoot())
-                    || 
!isCharacterType(argumentTypes.get(2).getLogicalType().getTypeRoot())) {
-                throw new ValidationException(
-                        "Unsupported argument type. "
-                                + "When taking 3 arguments, TO_TIMESTAMP_LTZ 
requires all three arguments to be of type <VARCHAR> or <CHAR>.");
-            }
+                break;
+            case 3:
+                if (!isCharacterType(firstTypeRoot)
+                        || 
!isCharacterType(argumentTypes.get(1).getLogicalType().getTypeRoot())
+                        || 
!isCharacterType(argumentTypes.get(2).getLogicalType().getTypeRoot())) {
+                    throw new ValidationException(
+                            "Unsupported argument type. "
+                                    + "When taking 3 arguments, 
TO_TIMESTAMP_LTZ requires all three arguments to be of type <VARCHAR> or 
<CHAR>.");
+                }
+                outputPrecision = inferPrecisionFromFormat(callContext);
+        }
+
+        return 
Optional.of(DataTypes.TIMESTAMP_LTZ(outputPrecision).nullable());
+    }
+
+    /**
+     * Infers the output precision from a format string literal. Returns at 
least {@link
+     * #DEFAULT_PRECISION}.
+     */
+    private static int inferPrecisionFromFormat(CallContext callContext) {
+        if (!callContext.isArgumentLiteral(1)) {
+            return DEFAULT_PRECISION;
         }
+        return callContext
+                .getArgumentValue(1, String.class)
+                .map(DateTimeUtils::precisionFromFormat)
+                .orElse(DEFAULT_PRECISION);
+    }
 
-        return 
Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION).nullable());
+    private static void validatePrecision(final int precision) {
+        if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
+            throw new ValidationException(
+                    String.format(
+                            "Precision for TO_TIMESTAMP_LTZ must be between %d 
and %d but was %d.",
+                            MIN_PRECISION, MAX_PRECISION, precision));
+        }
     }
 
     private boolean isCharacterType(LogicalTypeRoot typeRoot) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index 5f9eb01f57a..754e694c53a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -142,7 +142,7 @@ public class DateTimeUtils {
                     .optionalEnd()
                     .toFormatter();
 
-    private static final Integer DEFAULT_PRECISION = 3;
+    private static final int DEFAULT_PRECISION = 3;
 
     /**
      * A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat 
is not thread-safe.
@@ -327,76 +327,76 @@ public class DateTimeUtils {
     // Numeric -> Timestamp conversion
     // 
--------------------------------------------------------------------------------------------
 
-    public static TimestampData toTimestampData(long v, int precision) {
-        switch (precision) {
-            case 0:
-                if (MIN_EPOCH_SECONDS <= v && v <= MAX_EPOCH_SECONDS) {
-                    return timestampDataFromEpochMills(v * MILLIS_PER_SECOND);
-                } else {
-                    return null;
-                }
-            case 3:
-                return timestampDataFromEpochMills(v);
-            default:
-                throw new TableException(
-                        "The precision value '"
-                                + precision
-                                + "' for function "
-                                + "TO_TIMESTAMP_LTZ(numeric, precision) is 
unsupported,"
-                                + " the supported value is '0' for second or 
'3' for millisecond.");
-        }
+    /**
+     * Converts a numeric epoch value to {@link TimestampData}. The precision 
specifies the unit of
+     * the epoch value: 0 for seconds, 3 for milliseconds, 6 for microseconds, 
9 for nanoseconds,
+     * and any value in between. Returns {@code null} if the value is out of 
the valid timestamp
+     * range.
+     */
+    public static TimestampData toTimestampData(long epoch, int precision) {
+        return epochToTimestampData(epoch, precision);
     }
 
-    public static TimestampData toTimestampData(double v, int precision) {
-        switch (precision) {
-            case 0:
-                if (MIN_EPOCH_SECONDS <= v && v <= MAX_EPOCH_SECONDS) {
-                    return timestampDataFromEpochMills((long) (v * 
MILLIS_PER_SECOND));
-                } else {
-                    return null;
-                }
-            case 3:
-                return timestampDataFromEpochMills((long) v);
-            default:
-                throw new TableException(
-                        "The precision value '"
-                                + precision
-                                + "' for function "
-                                + "TO_TIMESTAMP_LTZ(numeric, precision) is 
unsupported,"
-                                + " the supported value is '0' for second or 
'3' for millisecond.");
+    /**
+     * See {@link #toTimestampData(long, int)}. The double value is first 
converted to nanoseconds
+     * to preserve fractional parts, then processed at nanosecond precision. 
Returns {@code null} if
+     * the value is out of the valid timestamp range.
+     */
+    public static TimestampData toTimestampData(double epoch, int precision) {
+        double factor = Math.pow(10, precision);
+        double epochSeconds = epoch / factor;
+        if (epochSeconds < MIN_EPOCH_SECONDS || epochSeconds > 
MAX_EPOCH_SECONDS) {
+            return null;
         }
+        double nanoFactor = Math.pow(10, 9 - precision);
+        long epochNanos = (long) (epoch * nanoFactor);
+        return epochToTimestampData(epochNanos, 9);
     }
 
-    public static TimestampData toTimestampData(DecimalData v, int precision) {
-        long epochMills;
-        switch (precision) {
-            case 0:
-                epochMills =
-                        v.toBigDecimal().setScale(0, 
RoundingMode.DOWN).longValue()
-                                * MILLIS_PER_SECOND;
-                return timestampDataFromEpochMills(epochMills);
-            case 3:
-                epochMills = toMillis(v);
-                return timestampDataFromEpochMills(epochMills);
-            default:
-                throw new TableException(
-                        "The precision value '"
-                                + precision
-                                + "' for function "
-                                + "TO_TIMESTAMP_LTZ(numeric, precision) is 
unsupported,"
-                                + " the supported value is '0' for second or 
'3' for millisecond.");
-        }
+    /** See {@link #toTimestampData(long, int)}. The decimal value is 
truncated to a long. */
+    public static TimestampData toTimestampData(DecimalData epoch, int 
precision) {
+        long epochValue = epoch.toBigDecimal().setScale(0, 
RoundingMode.DOWN).longValue();
+        return epochToTimestampData(epochValue, precision);
     }
 
-    private static TimestampData timestampDataFromEpochMills(long epochMills) {
-        if (MIN_EPOCH_MILLS <= epochMills && epochMills <= MAX_EPOCH_MILLS) {
-            return TimestampData.fromEpochMillis(epochMills);
+    private static TimestampData epochToTimestampData(long epoch, int 
precision) {
+        long factor = (long) Math.pow(10, precision);
+        long epochSeconds = Math.floorDiv(epoch, factor);
+
+        if (epochSeconds < MIN_EPOCH_SECONDS || epochSeconds > 
MAX_EPOCH_SECONDS) {
+            return null;
         }
-        return null;
+
+        long remainder = Math.floorMod(epoch, factor);
+        long nanoMultiplier = (long) Math.pow(10, 9 - precision);
+        long nanoAdjustment = remainder * nanoMultiplier;
+
+        return TimestampData.fromInstant(Instant.ofEpochSecond(epochSeconds, 
nanoAdjustment));
+    }
+
+    /**
+     * Converts an {@link Instant} to an epoch value at the given precision. 
This is the inverse of
+     * {@link #toTimestampData(long, int)}.
+     */
+    public static long toEpochValue(Instant instant, int precision) {
+        long factor = (long) Math.pow(10, precision);
+        long nanoDivisor = (long) Math.pow(10, 9 - precision);
+        return (instant.getEpochSecond() * factor) + (instant.getNano() / 
nanoDivisor);
     }
 
-    private static long toMillis(DecimalData v) {
-        return v.toBigDecimal().setScale(0, RoundingMode.DOWN).longValue();
+    /**
+     * Infers fractional second precision from a format pattern by counting 
trailing 'S' characters.
+     * Returns at least {@link #DEFAULT_PRECISION} (3) and at most 9.
+     */
+    public static int precisionFromFormat(String format) {
+        int sCount = 0;
+        for (int i = format.length() - 1; i >= 0; i--) {
+            if (format.charAt(i) != 'S') {
+                break;
+            }
+            sCount++;
+        }
+        return Math.max(Math.min(sCount, 9), DEFAULT_PRECISION);
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -423,7 +423,18 @@ public class DateTimeUtils {
                         .toInstant());
     }
 
+    /**
+     * Parses a timestamp string with the given format, truncating to 
millisecond precision.
+     * Precision is hardcoded to match signature of TO_TIMESTAMP.
+     *
+     * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-14925";>FLINK-14925</a>
+     */
     public static TimestampData parseTimestampData(String dateStr, String 
format) {
+        return parseTimestampData(dateStr, format, 3);
+    }
+
+    /** Parses a timestamp string with the given format, truncating to the 
specified precision. */
+    public static TimestampData parseTimestampData(String dateStr, String 
format, int precision) {
         DateTimeFormatter formatter;
         try {
             formatter = DATETIME_FORMATTER_CACHE.get(format);
@@ -432,9 +443,7 @@ public class DateTimeUtils {
         }
         try {
             TemporalAccessor accessor = formatter.parse(dateStr);
-            // Precision is hardcoded to match signature of TO_TIMESTAMP
-            //  https://issues.apache.org/jira/browse/FLINK-14925
-            LocalDateTime ldt = fromTemporalAccessor(accessor, 3);
+            LocalDateTime ldt = fromTemporalAccessor(accessor, precision);
             return TimestampData.fromLocalDateTime(ldt);
         } catch (DateTimeParseException e) {
             // fall back to support cases like '1999-9-10 05:20:10' or 
'1999-9-10'
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/ExpressionTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/ExpressionTest.java
index e677269902f..be87034542b 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/ExpressionTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/ExpressionTest.java
@@ -24,6 +24,9 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
@@ -42,6 +45,7 @@ import java.time.ZonedDateTime;
 import java.time.temporal.ChronoField;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -278,6 +282,17 @@ class ExpressionTest {
                 .isEqualTo(expected);
     }
 
+    @ParameterizedTest(name = "precision {1}: {2}")
+    @MethodSource("timestampLtzPrecisionTestCases")
+    void testTimestampLtzPrecisionAsSerializableString(
+            Instant instant, int precision, String expected) {
+        assertThat(
+                        new ValueLiteralExpression(
+                                        instant, 
DataTypes.TIMESTAMP_LTZ(precision).notNull())
+                                
.asSerializableString(DefaultSqlFactory.INSTANCE))
+                .isEqualTo(expected);
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private static Expression createExpressionTree(Integer nestedValue) {
@@ -304,4 +319,42 @@ class ExpressionTest {
                                 DataTypes.BOOLEAN())),
                 DataTypes.BOOLEAN());
     }
+
+    private static Stream<Arguments> timestampLtzPrecisionTestCases() {
+        return Stream.of(
+                // Precision 0-2: numeric variant wrapped in CAST to match 
data type
+                Arguments.of(
+                        Instant.ofEpochSecond(1234),
+                        0,
+                        "CAST(TO_TIMESTAMP_LTZ(1234, 0) AS TIMESTAMP_LTZ(0))"),
+                Arguments.of(
+                        Instant.ofEpochSecond(1, 100_000_000),
+                        1,
+                        "CAST(TO_TIMESTAMP_LTZ(11, 1) AS TIMESTAMP_LTZ(1))"),
+                Arguments.of(
+                        Instant.ofEpochSecond(1, 120_000_000),
+                        2,
+                        "CAST(TO_TIMESTAMP_LTZ(112, 2) AS TIMESTAMP_LTZ(2))"),
+                // Precision 3+: numeric variant without CAST
+                Arguments.of(Instant.ofEpochMilli(1234567), 3, 
"TO_TIMESTAMP_LTZ(1234567, 3)"),
+                Arguments.of(Instant.ofEpochSecond(1, 123400000), 4, 
"TO_TIMESTAMP_LTZ(11234, 4)"),
+                Arguments.of(Instant.ofEpochSecond(1, 123450000), 5, 
"TO_TIMESTAMP_LTZ(112345, 5)"),
+                Arguments.of(
+                        Instant.ofEpochSecond(1, 123456000), 6, 
"TO_TIMESTAMP_LTZ(1123456, 6)"),
+                Arguments.of(
+                        Instant.ofEpochSecond(1, 123456700), 7, 
"TO_TIMESTAMP_LTZ(11234567, 7)"),
+                Arguments.of(
+                        Instant.ofEpochSecond(1, 123456780), 8, 
"TO_TIMESTAMP_LTZ(112345678, 8)"),
+                Arguments.of(
+                        Instant.ofEpochSecond(1, 123456789), 9, 
"TO_TIMESTAMP_LTZ(1123456789, 9)"),
+                // Edge cases: large instants fall back to string variant to 
avoid long overflow
+                Arguments.of(
+                        Instant.parse("9999-12-31T23:59:59.999999999Z"),
+                        9,
+                        "TO_TIMESTAMP_LTZ('9999-12-31 23:59:59.999999999', 
'yyyy-MM-dd HH:mm:ss.SSSSSSSSS', 'UTC')"),
+                Arguments.of(
+                        Instant.parse("2262-04-12T00:00:00Z"),
+                        9,
+                        "TO_TIMESTAMP_LTZ('2262-04-12 00:00:00.000000000', 
'yyyy-MM-dd HH:mm:ss.SSSSSSSSS', 'UTC')"));
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
index 27331bb7e6c..08fcabc2728 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
@@ -29,6 +29,7 @@ class ToTimestampLtzTypeStrategyTest extends 
TypeStrategiesTestBase {
     @Override
     protected Stream<TestSpec> testData() {
         return Stream.of(
+                // Single argument: defaults to TIMESTAMP_LTZ(3)
                 TestSpec.forStrategy(
                                 "Valid single argument of type <VARCHAR> or 
<CHAR>",
                                 SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
@@ -45,6 +46,7 @@ class ToTimestampLtzTypeStrategyTest extends 
TypeStrategiesTestBase {
                         .inputTypes(DataTypes.BOOLEAN())
                         .expectErrorMessage(
                                 "Unsupported argument type. When taking 1 
argument, TO_TIMESTAMP_LTZ accepts an argument of type <VARCHAR>, <CHAR>, or 
<NUMERIC>."),
+                // Two arguments without literal: defaults to TIMESTAMP_LTZ(3)
                 TestSpec.forStrategy(
                                 "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>)",
                                 SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
@@ -73,6 +75,7 @@ class ToTimestampLtzTypeStrategyTest extends 
TypeStrategiesTestBase {
                         .inputTypes(DataTypes.BOOLEAN(), DataTypes.FLOAT())
                         .expectErrorMessage(
                                 "Unsupported argument type. When taking 2 
arguments, TO_TIMESTAMP_LTZ requires the first argument to be of type 
<VARCHAR>, <CHAR>, or <NUMERIC>."),
+                // Three arguments: defaults to TIMESTAMP_LTZ(3)
                 TestSpec.forStrategy(
                                 "Valid three arguments", 
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
                         .inputTypes(DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING())
@@ -93,6 +96,77 @@ class ToTimestampLtzTypeStrategyTest extends 
TypeStrategiesTestBase {
                                 DataTypes.STRING(),
                                 DataTypes.STRING())
                         .expectErrorMessage(
-                                "Unsupported argument type. TO_TIMESTAMP_LTZ 
requires 1 to 3 arguments, but 4 were provided."));
+                                "Unsupported argument type. TO_TIMESTAMP_LTZ 
requires 1 to 3 arguments, but 4 were provided."),
+                // Precision 0-3: clamped to TIMESTAMP_LTZ(3)
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) with 
precision 0",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BIGINT(), DataTypes.INT())
+                        .calledWithLiteralAt(1, 0)
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()),
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) with 
precision 3",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BIGINT(), DataTypes.INT())
+                        .calledWithLiteralAt(1, 3)
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()),
+                // Precision 4-9: follows input precision
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) with 
precision 4",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BIGINT(), DataTypes.INT())
+                        .calledWithLiteralAt(1, 4)
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(4).nullable()),
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) with 
precision 6",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BIGINT(), DataTypes.INT())
+                        .calledWithLiteralAt(1, 6)
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(6).nullable()),
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) with 
precision 9",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BIGINT(), DataTypes.INT())
+                        .calledWithLiteralAt(1, 9)
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(9).nullable()),
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<DOUBLE>, <INTEGER>) with 
precision 9",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.DOUBLE(), DataTypes.INT())
+                        .calledWithLiteralAt(1, 9)
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(9).nullable()),
+                // Out of range
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) with 
precision out of range",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BIGINT(), DataTypes.INT())
+                        .calledWithLiteralAt(1, 10)
+                        .expectErrorMessage(
+                                "Precision for TO_TIMESTAMP_LTZ must be 
between 0 and 9 but was 10."),
+                // Format-based precision inference for string variants
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<STRING>, <STRING>) with no 
S in format returns TIMESTAMP_LTZ(3)",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.STRING(), DataTypes.STRING())
+                        .calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss")
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()),
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<STRING>, <STRING>) with SSS 
format returns TIMESTAMP_LTZ(3)",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.STRING(), DataTypes.STRING())
+                        .calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SSS")
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()),
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<STRING>, <STRING>) with 
SSSSSS format returns TIMESTAMP_LTZ(6)",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.STRING(), DataTypes.STRING())
+                        .calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SSSSSS")
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(6).nullable()),
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<STRING>, <STRING>) with 
SSSSSSSSS format returns TIMESTAMP_LTZ(9)",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.STRING(), DataTypes.STRING())
+                        .calledWithLiteralAt(1, "yyyy-MM-dd 
HH:mm:ss.SSSSSSSSS")
+                        
.expectDataType(DataTypes.TIMESTAMP_LTZ(9).nullable()));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/LiteralExpressionsSerializationITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/LiteralExpressionsSerializationITCase.java
index 05eebac2dd6..c24b84b1b00 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/LiteralExpressionsSerializationITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/LiteralExpressionsSerializationITCase.java
@@ -85,7 +85,10 @@ public class LiteralExpressionsSerializationITCase {
                                 lit(localTimeWithoutSeconds, 
DataTypes.TIME().notNull()),
                                 lit(localDateTime, 
DataTypes.TIMESTAMP(3).notNull()),
                                 lit(localDateTimeWithoutSeconds, 
DataTypes.TIMESTAMP(3).notNull()),
+                                lit(instant, 
DataTypes.TIMESTAMP_LTZ(0).notNull()),
                                 lit(instant, 
DataTypes.TIMESTAMP_LTZ(3).notNull()),
+                                lit(instant, 
DataTypes.TIMESTAMP_LTZ(6).notNull()),
+                                lit(instant, 
DataTypes.TIMESTAMP_LTZ(9).notNull()),
                                 lit(
                                         duration,
                                         DataTypes.INTERVAL(DataTypes.DAY(), 
DataTypes.SECOND(9))
@@ -122,7 +125,10 @@ public class LiteralExpressionsSerializationITCase {
                                 + "TIME '12:12:00',\n"
                                 + "TIMESTAMP '2024-02-03 12:12:12.333',\n"
                                 + "TIMESTAMP '2024-02-03 12:12:00',\n"
+                                + "CAST(TO_TIMESTAMP_LTZ(1234, 0) AS 
TIMESTAMP_LTZ(0)),\n"
                                 + "TO_TIMESTAMP_LTZ(1234567, 3),\n"
+                                + "TO_TIMESTAMP_LTZ(1234567000, 6),\n"
+                                + "TO_TIMESTAMP_LTZ(1234567000000, 9),\n"
                                 + "INTERVAL '99 00:00:34.999' DAY TO 
SECOND(3),\n"
                                 + "INTERVAL '39-2' YEAR TO MONTH");
 
@@ -148,6 +154,9 @@ public class LiteralExpressionsSerializationITCase {
                                 localTimeWithoutSeconds,
                                 localDateTime,
                                 localDateTimeWithoutSeconds,
+                                Instant.ofEpochSecond(1234),
+                                instant,
+                                instant,
                                 instant,
                                 duration,
                                 period));
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
index fe4b766ffd0..d606dc15313 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
@@ -815,11 +815,11 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase 
{
         return Stream.of(
                 
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
                         .onFieldsWithData(
-                                100,
-                                1234,
-                                -100,
+                                100.0d,
+                                1234L,
+                                -100L,
                                 DecimalDataUtils.castFrom(-Double.MAX_VALUE, 
38, 18),
-                                100.01,
+                                100.01f,
                                 "unparsable",
                                 null)
                         .andDataTypes(
@@ -853,6 +853,11 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
                                 TIMESTAMP_LTZ(3).nullable())
                         .testResult(
                                 toTimestampLtz($("f3"), literal(0)),
+                                "TO_TIMESTAMP_LTZ(f3, 0)",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(-Double.MAX_VALUE, literal(0)),
                                 "TO_TIMESTAMP_LTZ(-" + Double.MAX_VALUE + ", 
0)",
                                 null,
                                 TIMESTAMP_LTZ(3).nullable())
@@ -888,24 +893,24 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase 
{
                                 toTimestampLtz(
                                         "1970-01-01 00:00:00.12345", 
"yyyy-MM-dd HH:mm:ss.SSSSS"),
                                 "TO_TIMESTAMP_LTZ('1970-01-01 00:00:00.12345', 
'yyyy-MM-dd HH:mm:ss.SSSSS')",
-                                LocalDateTime.of(1970, 1, 1, 0, 0, 0, 
123000000)
+                                LocalDateTime.of(1970, 1, 1, 0, 0, 0, 
123450000)
                                         .atZone(ZoneOffset.UTC)
                                         .toInstant(),
-                                TIMESTAMP_LTZ(3).nullable())
+                                TIMESTAMP_LTZ(5).nullable())
                         .testResult(
                                 toTimestampLtz("20000202 59:59.1234567", 
"yyyyMMdd mm:ss.SSSSSSS"),
                                 "TO_TIMESTAMP_LTZ('20000202 59:59.1234567', 
'yyyyMMdd mm:ss.SSSSSSS')",
-                                LocalDateTime.of(2000, 2, 2, 0, 59, 59, 
123000000)
+                                LocalDateTime.of(2000, 2, 2, 0, 59, 59, 
123456700)
                                         .atZone(ZoneOffset.UTC)
                                         .toInstant(),
-                                TIMESTAMP_LTZ(3).nullable())
+                                TIMESTAMP_LTZ(7).nullable())
                         .testResult(
                                 toTimestampLtz("1234567", "SSSSSSS"),
                                 "TO_TIMESTAMP_LTZ('1234567', 'SSSSSSS')",
-                                LocalDateTime.of(1970, 1, 1, 0, 0, 0, 
123000000)
+                                LocalDateTime.of(1970, 1, 1, 0, 0, 0, 
123456700)
                                         .atZone(ZoneOffset.UTC)
                                         .toInstant(),
-                                TIMESTAMP_LTZ(3).nullable())
+                                TIMESTAMP_LTZ(7).nullable())
                         .testResult(
                                 toTimestampLtz(
                                         "2017-09-15 00:00:00.12345", 
"yyyy-MM-dd HH:mm:ss.SSS"),
@@ -914,6 +919,14 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
                                         .atZone(ZoneOffset.UTC)
                                         .toInstant(),
                                 TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2023-01-01 00:00:00.1", "yyyy-MM-dd 
HH:mm:ss.SSSSSS"),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00.1', 
'yyyy-MM-dd HH:mm:ss.SSSSSS')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0, 
100000000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(6).nullable())
                         .testResult(
                                 toTimestampLtz(
                                         "2023-01-01 00:00:00",
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 60e93f1004c..15fb9017205 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -1283,24 +1283,6 @@ class TemporalTypesTest extends ExpressionTestBase {
       s"TO_TIMESTAMP_LTZ(253402300800000, 3)",
       "NULL")
 
-    // invalid precision
-    testExpectedAllApisException(
-      toTimestampLtz(12, 1),
-      "TO_TIMESTAMP_LTZ(12, 1)",
-      "The precision value '1' for function TO_TIMESTAMP_LTZ(numeric, 
precision) is unsupported," +
-        " the supported value is '0' for second or '3' for millisecond.",
-      classOf[TableException]
-    )
-
-    // invalid precision
-    testExpectedAllApisException(
-      toTimestampLtz(1000000000, 9),
-      "TO_TIMESTAMP_LTZ(1000000000, 9)",
-      "The precision value '9' for function TO_TIMESTAMP_LTZ(numeric, 
precision) is unsupported," +
-        " the supported value is '0' for second or '3' for millisecond.",
-      classOf[TableException]
-    )
-
     // invalid type for the first input
     testExpectedSqlException(
       "TO_TIMESTAMP_LTZ('test_string_type', 0)",
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java
index e1b8478c210..dbcf30a61da 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java
@@ -115,7 +115,9 @@ public class ToTimestampLtzFunction extends 
BuiltInScalarFunction {
             return null;
         }
 
-        return parseTimestampData(timestamp.toString(), format.toString());
+        String formatStr = format.toString();
+        return parseTimestampData(
+                timestamp.toString(), formatStr, 
DateTimeUtils.precisionFromFormat(formatStr));
     }
 
     public @Nullable TimestampData eval(
@@ -124,7 +126,12 @@ public class ToTimestampLtzFunction extends 
BuiltInScalarFunction {
             return null;
         }
 
-        TimestampData ts = parseTimestampData(dateStr.toString(), 
format.toString());
+        String formatStr = format.toString();
+        TimestampData ts =
+                parseTimestampData(
+                        dateStr.toString(),
+                        formatStr,
+                        DateTimeUtils.precisionFromFormat(formatStr));
         if (ts == null) {
             return null;
         }

Reply via email to