wuchong commented on a change in pull request #15485:
URL: https://github.com/apache/flink/pull/15485#discussion_r607112374



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java
##########
@@ -188,7 +189,8 @@ public WindowCombineFunction create(
                 InternalTimerService<Long> timerService,
                 KeyedStateBackend<RowData> stateBackend,
                 WindowState<Long> windowState,
-                boolean isEventTime)
+                boolean isEventTime,
+                ZoneId shiftTimeZone)

Review comment:
       `TopNRecordsCombiner` also registers event time timer, so I think we 
should also shift it. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java
##########
@@ -99,9 +100,11 @@ public void open(TriggerContext ctx) throws Exception {
         public boolean onElement(Object element, long timestamp, W window) 
throws Exception {
             ReducingState<Long> nextFiring = 
ctx.getPartitionedState(nextFiringStateDesc);
             if (nextFiring.get() == null) {
-                long nextTimer = ctx.getCurrentProcessingTime() + interval;
-                ctx.registerProcessingTimeTimer(nextTimer);
-                nextFiring.add(nextTimer);
+                long nextShiftedTimer =
+                        toUtcTimestampMills(ctx.getCurrentProcessingTime(), 
ctx.getShiftTimeZone())

Review comment:
       Personally, I don't think we need to shift in this case. The registered 
timer should in local zone instead of UTC zone, `long nextTimer = 
ctx.getCurrentProcessingTime() + interval` should be correct. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
##########
@@ -48,9 +48,10 @@ object WatermarkGeneratorCodeGenerator {
       contextTerm: Option[String] = None): GeneratedWatermarkGenerator = {
     // validation
     val watermarkOutputType = 
FlinkTypeFactory.toLogicalType(watermarkExpr.getType)
-    if (watermarkOutputType.getTypeRoot != 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
+    if (! (watermarkOutputType.getTypeRoot == 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE ||

Review comment:
       nit: remove blank space after `!`.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalFunctionJoinTest.scala
##########
@@ -46,10 +46,12 @@ class TemporalFunctionJoinTest extends TableTestBase {
   private val proctimeRatesHistory = util.addDataStream[(String, Int)](
     "ProctimeRatesHistory", 'currency, 'rate, 'proctime.proctime)
 
-  util.addFunction(
+  util.addTemporarySystemFunction(
     "ProctimeRates",
     proctimeRatesHistory.createTemporalTableFunction($"proctime", $"currency"))
 
+
+

Review comment:
       nit: remove empty line

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala
##########
@@ -84,78 +94,115 @@ class TimeAttributeITCase extends StreamingTestBase {
   }
 
   @Test
-  def testWindowAggregateOnCustomizedWatermark(): Unit = {
-    JavaFunc5.openCalled = false
-    JavaFunc5.closeCalled = false
-    tEnv.createTemporaryFunction("myFunc", new JavaFunc5)
+  def testWindowAggregateOnTimestampLtzWatermark(): Unit = {
+    val zoneId = "Asia/Shanghai"
+    tEnv.getConfig.setLocalTimeZone(ZoneId.of(zoneId))
     val ddl =
       s"""
-        |CREATE TABLE src (
-        |  log_ts STRING,
-        |  ts TIMESTAMP(3),
-        |  a INT,
-        |  b DOUBLE,
-        |  WATERMARK FOR ts AS myFunc(ts, a)
-        |) WITH (
-        |  'connector' = 'values',
-        |  'data-id' = '$dataId'
-        |)
+         |CREATE TABLE src1 (
+         |  log_ts STRING,
+         |  ts BIGINT,
+         |  a INT,
+         |  b DOUBLE,
+         |  ltz_ts AS TO_TIMESTAMP_LTZ(ts, 3),
+         |  WATERMARK FOR ltz_ts AS ltz_ts - INTERVAL '0.001' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$ltzDataId'
+         |)
       """.stripMargin
     val query =
       """
-        |SELECT TUMBLE_END(ts, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
-        |FROM src
-        |GROUP BY TUMBLE(ts, INTERVAL '0.003' SECOND)
+        |SELECT TUMBLE_END(ltz_ts, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
+        |FROM src1
+        |GROUP BY TUMBLE(ltz_ts, INTERVAL '0.003' SECOND)
       """.stripMargin
     tEnv.executeSql(ddl)
-    val sink = new TestingAppendSink()
+    val sink = new TestingAppendSink(TimeZone.getTimeZone(zoneId))
     tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
     env.execute("SQL JOB")
 
     val expected = Seq(
-      "1970-01-01T00:00:00.003,2,3.0",
-      "1970-01-01T00:00:00.006,2,7.0",
-      "1970-01-01T00:00:00.009,2,6.0",
-      "1970-01-01T00:00:00.018,1,4.0")
+      "1970-01-01T08:00:00.003,2,3.0",
+      "1970-01-01T08:00:00.006,2,7.0",
+      "1970-01-01T08:00:00.009,2,6.0",
+      "1970-01-01T08:00:00.018,1,4.0")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
-    assertTrue(JavaFunc5.openCalled)
-    assertTrue(JavaFunc5.closeCalled)
   }
 
-  @Test
-  def testWindowAggregateOnComputedRowtime(): Unit = {
-    val ddl =
-      s"""
-        |CREATE TABLE src (
-        |  log_ts STRING,
-        |  ts TIMESTAMP(3),
-        |  a INT,
-        |  b DOUBLE,
-        |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
-        |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
-        |) WITH (
-        |  'connector' = 'values',
-        |  'data-id' = '$dataId'
-        |)
-      """.stripMargin
-    val query =
-      """
-        |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
-        |FROM src
-        |GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
-      """.stripMargin
-    tEnv.executeSql(ddl)
-    val sink = new TestingAppendSink()
-    tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
-    env.execute("SQL JOB")
-
-    val expected = Seq(
-      "1970-01-01T00:00:00.003,2,3.0",
-      "1970-01-01T00:00:00.006,2,7.0",
-      "1970-01-01T00:00:00.009,2,6.0",
-      "1970-01-01T00:00:00.018,1,4.0")
-    assertEquals(expected.sorted, sink.getAppendResults.sorted)
-  }
+//  @Test
+//  def testWindowAggregateOnCustomizedWatermark(): Unit = {

Review comment:
       Why remove these 2 tests?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
##########
@@ -519,18 +520,19 @@ private static void validateColumnsAndWatermarkSpecs(
                                                     String.format(
                                                             "Rowtime attribute 
'%s' is not defined in schema.",
                                                             
rowtimeAttribute)));
-            if (rowtimeType.getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE) {
+            if (!(rowtimeType.getTypeRoot() == 
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+                    || rowtimeType.getTypeRoot() == 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
                 throw new ValidationException(
                         String.format(
-                                "Rowtime attribute '%s' must be of type 
TIMESTAMP but is of type '%s'.",
+                                "Rowtime attribute '%s' must be of type 
TIMESTAMP or TIMESTAMP_LTZ but is of type '%s'.",
                                 rowtimeAttribute, rowtimeType));
             }
             LogicalType watermarkOutputType =
                     watermark.getWatermarkExprOutputType().getLogicalType();
-            if (watermarkOutputType.getTypeRoot() != 
TIMESTAMP_WITHOUT_TIME_ZONE) {
+            if (!supportedWatermarkType(watermarkOutputType)) {

Review comment:
       I think we can and we should just compare equality of 
`watermarkOutputType` and `rowtimeType`.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+@Internal
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    private static final long SECONDS_PER_HOUR = 60 * 60L;
+
+    private static final long MILLS_PER_HOUR = SECONDS_PER_HOUR * 1000L;
+
+    /**
+     * Convert a epoch mills to timestamp mills which can describe a locate 
date time.
+     *
+     * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 
1970-01-01 08:00:05, the
+     * timestamp mills is 8 * 60 * 60 * 1000 + 5.
+     *
+     * @param epochMills the epoch mills.
+     * @param shiftTimeZone the timezone that the given timestamp mills has 
been shifted.
+     * @return the mills which can describe the local timestamp string in 
given timezone.
+     */
+    public static long toUtcTimestampMills(long epochMills, ZoneId 
shiftTimeZone) {
+        if (UTC_ZONE_ID.equals(shiftTimeZone) || Long.MAX_VALUE == epochMills) 
{
+            return epochMills;
+        }
+        LocalDateTime localDateTime =
+                LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMills), 
shiftTimeZone);
+        return localDateTime.atZone(UTC_ZONE_ID).toInstant().toEpochMilli();
+    }
+
+    /**
+     * Get a timer time according to the timestamp mills and the given shift 
timezone.
+     *
+     * @param utcTimestampMills the timestamp mills.
+     * @param shiftTimeZone the timezone that the given timestamp mills has 
been shifted.
+     * @return the epoch mills.
+     */
+    public static long toEpochMillsForTimer(long utcTimestampMills, ZoneId 
shiftTimeZone) {
+        if (UTC_ZONE_ID.equals(shiftTimeZone.equals(shiftTimeZone))
+                || Long.MAX_VALUE == utcTimestampMills) {

Review comment:
       ditto.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+@Internal
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    private static final long SECONDS_PER_HOUR = 60 * 60L;
+
+    private static final long MILLS_PER_HOUR = SECONDS_PER_HOUR * 1000L;
+
+    /**
+     * Convert a epoch mills to timestamp mills which can describe a locate 
date time.
+     *
+     * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 
1970-01-01 08:00:05, the
+     * timestamp mills is 8 * 60 * 60 * 1000 + 5.
+     *
+     * @param epochMills the epoch mills.
+     * @param shiftTimeZone the timezone that the given timestamp mills has 
been shifted.
+     * @return the mills which can describe the local timestamp string in 
given timezone.
+     */
+    public static long toUtcTimestampMills(long epochMills, ZoneId 
shiftTimeZone) {
+        if (UTC_ZONE_ID.equals(shiftTimeZone) || Long.MAX_VALUE == epochMills) 
{

Review comment:
       Add comment to explain the special logic.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala
##########
@@ -63,4 +110,31 @@ class UnionTest extends TableTestBase {
     util.verifyRelPlanWithType(sqlQuery)
   }
 
+  @Test
+  def testUnionDiffRowTime(): Unit = {
+    expectedException.expectMessage("Union fields with time attributes have 
different types")

Review comment:
       nit: would be better to add more information, e.g. the time attributes 
are...

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
##########
@@ -52,13 +54,13 @@ public static boolean isTimePoint(LogicalType type) {
     }
 
     public static boolean isRowTime(LogicalType type) {
-        return type instanceof TimestampType
-                && ((TimestampType) type).getKind() == TimestampKind.ROWTIME;
+        return (type instanceof TimestampType || type instanceof 
LocalZonedTimestampType)
+                && isTimeAttribute(type);

Review comment:
       Should be `isRowtimeAttribute`?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala
##########
@@ -676,7 +714,19 @@ class RexTimeIndicatorMaterializer(
       // extraction
       case FINAL if updatedCall.getOperands.size() == 1
         && isMatchTimeIndicator(updatedCall.getOperands.get(0)) =>
-        updatedCall
+        val rowtimeType = updatedCall.getOperands.get(0).getType
+        updatedCall.clone(
+          rowtimeType,
+          materializedOperands)
+
+      case FlinkSqlOperatorTable.MATCH_ROWTIME if 
isTimeIndicatorType(updatedCall.getType) =>
+        val rowtimeType = input.filter(isTimeIndicatorType).head
+

Review comment:
       remove empty line and add comments

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
##########
@@ -226,11 +225,10 @@ private void validateTimeColumn(String columnName, 
List<Column> columns) {
     }
 
     private void validateWatermarkExpression(LogicalType watermarkType) {
-        if (!hasRoot(watermarkType, 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
-                || getPrecision(watermarkType) != 3) {
+        if (!supportedWatermarkType(watermarkType) || 
getPrecision(watermarkType) != 3) {

Review comment:
       Please also remember to validate watermark output type should be the 
same with rowtime type. And please add tests for this. 

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
##########
@@ -109,15 +111,38 @@ public static boolean hasFamily(LogicalType logicalType, 
LogicalTypeFamily famil
     }
 
     public static boolean isTimeAttribute(LogicalType logicalType) {
-        return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) != 
TimestampKind.REGULAR;
+        return hasFamily(logicalType, TIMESTAMP)

Review comment:
       Just check type root here to be more strict.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
##########
@@ -364,6 +364,69 @@ class MatchRecognizeITCase(backend: StateBackendMode) 
extends StreamingWithState
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
+  @Test
+  def testWindowedGroupingAppliedToMatchRecognizeOnLtzRowtime(): Unit = {

Review comment:
       How can we get deterministic results for every time zone for 
TIMESTAMP_LTZ rowtime? 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala
##########
@@ -676,7 +714,19 @@ class RexTimeIndicatorMaterializer(
       // extraction
       case FINAL if updatedCall.getOperands.size() == 1

Review comment:
       update comments about `FINAL(MATCH_ROWTIME)`.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
##########
@@ -213,11 +213,10 @@ private void validateTimeColumn(String columnName, 
List<Column> columns) {
                             
columns.stream().map(Column::getName).collect(Collectors.toList())));
         }
         final LogicalType timeFieldType = 
timeColumn.get().getDataType().getLogicalType();
-        if (!hasRoot(timeFieldType, 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
-                || getPrecision(timeFieldType) != 3) {
+        if (!supportedWatermarkType(timeFieldType) || 
getPrecision(timeFieldType) != 3) {

Review comment:
       `supportedWatermarkType` looks weird because time field of cause can't 
be watermark. I think you may want to use `canBeTimeAttributeType` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to