This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new e7422c9340d [FLINK-36644] TIMESTAMPDIFF can not be string serialized
(#25599)
e7422c9340d is described below
commit e7422c9340d76768b9770609dfa4fd17d873cc16
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Mon Nov 4 09:12:05 2024 +0100
[FLINK-36644] TIMESTAMPDIFF can not be string serialized (#25599)
---
.../expressions/ExpressionSerializationTest.java | 46 ++++++++++++----------
.../functions/BuiltInFunctionDefinitions.java | 14 +++++++
2 files changed, 40 insertions(+), 20 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
index d9e9728f9c4..22d85bad8eb 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
@@ -48,6 +48,7 @@ import java.util.Optional;
import java.util.stream.Stream;
import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.timestampDiff;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for serializing {@link BuiltInFunctionDefinitions} into a SQL
string. */
@@ -193,24 +194,6 @@ public class ExpressionSerializationTest {
TestSpec.forExpr($("f0").substring(2, 5))
.withField("f0", DataTypes.STRING())
.expectStr("SUBSTRING(`f0` FROM 2 FOR 5)"),
- TestSpec.forExpr($("f0").extract(TimeIntervalUnit.HOUR))
- .withField("f0", DataTypes.TIMESTAMP())
- .expectStr("EXTRACT(HOUR FROM `f0`)"),
- TestSpec.forExpr($("f0").floor(TimeIntervalUnit.HOUR))
- .withField("f0", DataTypes.TIMESTAMP())
- .expectStr("FLOOR(`f0` TO HOUR)"),
- TestSpec.forExpr($("f0").ceil(TimeIntervalUnit.HOUR))
- .withField("f0", DataTypes.TIMESTAMP())
- .expectStr("CEIL(`f0` TO HOUR)"),
- TestSpec.forExpr(
- Expressions.temporalOverlaps(
- $("f0"), $("f1"),
- $("f2"), $("f3")))
- .withField("f0", DataTypes.TIMESTAMP())
- .withField("f1", DataTypes.TIMESTAMP())
- .withField("f2", DataTypes.TIMESTAMP())
- .withField("f3", DataTypes.TIMESTAMP())
- .expectStr("(`f0`, `f1`) OVERLAPS (`f2`, `f3`)"),
TestSpec.forExpr($("f0").get("g0").plus($("f0").get("g1").get("h1")))
.withField(
"f0",
@@ -305,8 +288,31 @@ public class ExpressionSerializationTest {
.plus($("f0").avg().distinct())
.plus($("f0").max()))
.withField("f0", DataTypes.BIGINT())
- .expectStr(
- "((COUNT(DISTINCT `f0`)) + (AVG(DISTINCT
`f0`))) + (MAX(`f0`))"));
+ .expectStr("((COUNT(DISTINCT `f0`)) + (AVG(DISTINCT
`f0`))) + (MAX(`f0`))"),
+
+ // Time functions
+ TestSpec.forExpr($("f0").extract(TimeIntervalUnit.HOUR))
+ .withField("f0", DataTypes.TIMESTAMP())
+ .expectStr("EXTRACT(HOUR FROM `f0`)"),
+ TestSpec.forExpr($("f0").floor(TimeIntervalUnit.HOUR))
+ .withField("f0", DataTypes.TIMESTAMP())
+ .expectStr("FLOOR(`f0` TO HOUR)"),
+ TestSpec.forExpr($("f0").ceil(TimeIntervalUnit.HOUR))
+ .withField("f0", DataTypes.TIMESTAMP())
+ .expectStr("CEIL(`f0` TO HOUR)"),
+ TestSpec.forExpr(
+ Expressions.temporalOverlaps(
+ $("f0"), $("f1"),
+ $("f2"), $("f3")))
+ .withField("f0", DataTypes.TIMESTAMP())
+ .withField("f1", DataTypes.TIMESTAMP())
+ .withField("f2", DataTypes.TIMESTAMP())
+ .withField("f3", DataTypes.TIMESTAMP())
+ .expectStr("(`f0`, `f1`) OVERLAPS (`f2`, `f3`)"),
+ TestSpec.forExpr(timestampDiff(TimePointUnit.DAY, $("f0"),
$("f1")))
+ .withField("f0", DataTypes.TIMESTAMP())
+ .withField("f1", DataTypes.TIMESTAMP())
+ .expectStr("TIMESTAMPDIFF(DAY, `f0`, `f1`)"));
}
@ParameterizedTest
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 24ecc558ddb..7aed83fcf6f 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.JsonQueryWrapper;
import org.apache.flink.table.api.JsonType;
import org.apache.flink.table.api.JsonValueOnEmptyOrError;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import org.apache.flink.table.expressions.TimePointUnit;
import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -56,6 +57,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.BOOLEAN;
@@ -1930,6 +1932,18 @@ public final class BuiltInFunctionDefinitions {
BuiltInFunctionDefinition.newBuilder()
.name("timestampDiff")
.kind(SCALAR)
+ .callSyntax(
+ "TIMESTAMPDIFF",
+ (sqlName, operands) ->
+ String.format(
+ "%s(%s, %s)",
+ sqlName,
+ CallSyntaxUtils.getSymbolLiteral(
+ operands.get(0),
TimePointUnit.class)
+ .name(),
+ operands.subList(1,
operands.size()).stream()
+
.map(ResolvedExpression::asSerializableString)
+
.collect(Collectors.joining(", "))))
.inputTypeStrategy(
sequence(
symbol(