wuchong commented on a change in pull request #15485: URL: https://github.com/apache/flink/pull/15485#discussion_r607112374
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java ########## @@ -188,7 +189,8 @@ public WindowCombineFunction create( InternalTimerService<Long> timerService, KeyedStateBackend<RowData> stateBackend, WindowState<Long> windowState, - boolean isEventTime) + boolean isEventTime, + ZoneId shiftTimeZone) Review comment: `TopNRecordsCombiner` also registers event time timer, so I think we should also shift it. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java ########## @@ -99,9 +100,11 @@ public void open(TriggerContext ctx) throws Exception { public boolean onElement(Object element, long timestamp, W window) throws Exception { ReducingState<Long> nextFiring = ctx.getPartitionedState(nextFiringStateDesc); if (nextFiring.get() == null) { - long nextTimer = ctx.getCurrentProcessingTime() + interval; - ctx.registerProcessingTimeTimer(nextTimer); - nextFiring.add(nextTimer); + long nextShiftedTimer = + toUtcTimestampMills(ctx.getCurrentProcessingTime(), ctx.getShiftTimeZone()) Review comment: Personally, I don't think we need to shift in this case. The registered timer should in local zone instead of UTC zone, `long nextTimer = ctx.getCurrentProcessingTime() + interval` should be correct. ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala ########## @@ -48,9 +48,10 @@ object WatermarkGeneratorCodeGenerator { contextTerm: Option[String] = None): GeneratedWatermarkGenerator = { // validation val watermarkOutputType = FlinkTypeFactory.toLogicalType(watermarkExpr.getType) - if (watermarkOutputType.getTypeRoot != LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { + if (! (watermarkOutputType.getTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE || Review comment: nit: remove blank space after `!`. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalFunctionJoinTest.scala ########## @@ -46,10 +46,12 @@ class TemporalFunctionJoinTest extends TableTestBase { private val proctimeRatesHistory = util.addDataStream[(String, Int)]( "ProctimeRatesHistory", 'currency, 'rate, 'proctime.proctime) - util.addFunction( + util.addTemporarySystemFunction( "ProctimeRates", proctimeRatesHistory.createTemporalTableFunction($"proctime", $"currency")) + + Review comment: nit: remove empty line ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala ########## @@ -84,78 +94,115 @@ class TimeAttributeITCase extends StreamingTestBase { } @Test - def testWindowAggregateOnCustomizedWatermark(): Unit = { - JavaFunc5.openCalled = false - JavaFunc5.closeCalled = false - tEnv.createTemporaryFunction("myFunc", new JavaFunc5) + def testWindowAggregateOnTimestampLtzWatermark(): Unit = { + val zoneId = "Asia/Shanghai" + tEnv.getConfig.setLocalTimeZone(ZoneId.of(zoneId)) val ddl = s""" - |CREATE TABLE src ( - | log_ts STRING, - | ts TIMESTAMP(3), - | a INT, - | b DOUBLE, - | WATERMARK FOR ts AS myFunc(ts, a) - |) WITH ( - | 'connector' = 'values', - | 'data-id' = '$dataId' - |) + |CREATE TABLE src1 ( + | log_ts STRING, + | ts BIGINT, + | a INT, + | b DOUBLE, + | ltz_ts AS TO_TIMESTAMP_LTZ(ts, 3), + | WATERMARK FOR ltz_ts AS ltz_ts - INTERVAL '0.001' SECOND + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$ltzDataId' + |) """.stripMargin val query = """ - |SELECT TUMBLE_END(ts, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) - |FROM src - |GROUP BY TUMBLE(ts, INTERVAL '0.003' SECOND) + |SELECT TUMBLE_END(ltz_ts, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) + |FROM src1 + |GROUP BY TUMBLE(ltz_ts, INTERVAL '0.003' SECOND) """.stripMargin tEnv.executeSql(ddl) - val sink = new TestingAppendSink() + val sink = new TestingAppendSink(TimeZone.getTimeZone(zoneId)) tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink) env.execute("SQL JOB") val expected = Seq( - "1970-01-01T00:00:00.003,2,3.0", - "1970-01-01T00:00:00.006,2,7.0", - "1970-01-01T00:00:00.009,2,6.0", - "1970-01-01T00:00:00.018,1,4.0") + "1970-01-01T08:00:00.003,2,3.0", + "1970-01-01T08:00:00.006,2,7.0", + "1970-01-01T08:00:00.009,2,6.0", + "1970-01-01T08:00:00.018,1,4.0") assertEquals(expected.sorted, sink.getAppendResults.sorted) - assertTrue(JavaFunc5.openCalled) - assertTrue(JavaFunc5.closeCalled) } - @Test - def testWindowAggregateOnComputedRowtime(): Unit = { - val ddl = - s""" - |CREATE TABLE src ( - | log_ts STRING, - | ts TIMESTAMP(3), - | a INT, - | b DOUBLE, - | rowtime AS CAST(log_ts AS TIMESTAMP(3)), - | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND - |) WITH ( - | 'connector' = 'values', - | 'data-id' = '$dataId' - |) - """.stripMargin - val query = - """ - |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) - |FROM src - |GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) - """.stripMargin - tEnv.executeSql(ddl) - val sink = new TestingAppendSink() - tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink) - env.execute("SQL JOB") - - val expected = Seq( - "1970-01-01T00:00:00.003,2,3.0", - "1970-01-01T00:00:00.006,2,7.0", - "1970-01-01T00:00:00.009,2,6.0", - "1970-01-01T00:00:00.018,1,4.0") - assertEquals(expected.sorted, sink.getAppendResults.sorted) - } +// @Test +// def testWindowAggregateOnCustomizedWatermark(): Unit = { Review comment: Why remove these 2 tests? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ########## @@ -519,18 +520,19 @@ private static void validateColumnsAndWatermarkSpecs( String.format( "Rowtime attribute '%s' is not defined in schema.", rowtimeAttribute))); - if (rowtimeType.getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE) { + if (!(rowtimeType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE + || rowtimeType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { throw new ValidationException( String.format( - "Rowtime attribute '%s' must be of type TIMESTAMP but is of type '%s'.", + "Rowtime attribute '%s' must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type '%s'.", rowtimeAttribute, rowtimeType)); } LogicalType watermarkOutputType = watermark.getWatermarkExprOutputType().getLogicalType(); - if (watermarkOutputType.getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE) { + if (!supportedWatermarkType(watermarkOutputType)) { Review comment: I think we can and we should just compare equality of `watermarkOutputType` and `rowtimeType`. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.TimeZone; + +/** Time util to deals window start and end in different timezone. */ +@Internal +public class TimeWindowUtil { + + private static final ZoneId UTC_ZONE_ID = TimeZone.getTimeZone("UTC").toZoneId(); + + private static final long SECONDS_PER_HOUR = 60 * 60L; + + private static final long MILLS_PER_HOUR = SECONDS_PER_HOUR * 1000L; + + /** + * Convert a epoch mills to timestamp mills which can describe a locate date time. + * + * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 1970-01-01 08:00:05, the + * timestamp mills is 8 * 60 * 60 * 1000 + 5. + * + * @param epochMills the epoch mills. + * @param shiftTimeZone the timezone that the given timestamp mills has been shifted. + * @return the mills which can describe the local timestamp string in given timezone. + */ + public static long toUtcTimestampMills(long epochMills, ZoneId shiftTimeZone) { + if (UTC_ZONE_ID.equals(shiftTimeZone) || Long.MAX_VALUE == epochMills) { + return epochMills; + } + LocalDateTime localDateTime = + LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMills), shiftTimeZone); + return localDateTime.atZone(UTC_ZONE_ID).toInstant().toEpochMilli(); + } + + /** + * Get a timer time according to the timestamp mills and the given shift timezone. + * + * @param utcTimestampMills the timestamp mills. + * @param shiftTimeZone the timezone that the given timestamp mills has been shifted. + * @return the epoch mills. + */ + public static long toEpochMillsForTimer(long utcTimestampMills, ZoneId shiftTimeZone) { + if (UTC_ZONE_ID.equals(shiftTimeZone.equals(shiftTimeZone)) + || Long.MAX_VALUE == utcTimestampMills) { Review comment: ditto. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.TimeZone; + +/** Time util to deals window start and end in different timezone. */ +@Internal +public class TimeWindowUtil { + + private static final ZoneId UTC_ZONE_ID = TimeZone.getTimeZone("UTC").toZoneId(); + + private static final long SECONDS_PER_HOUR = 60 * 60L; + + private static final long MILLS_PER_HOUR = SECONDS_PER_HOUR * 1000L; + + /** + * Convert a epoch mills to timestamp mills which can describe a locate date time. + * + * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 1970-01-01 08:00:05, the + * timestamp mills is 8 * 60 * 60 * 1000 + 5. + * + * @param epochMills the epoch mills. + * @param shiftTimeZone the timezone that the given timestamp mills has been shifted. + * @return the mills which can describe the local timestamp string in given timezone. + */ + public static long toUtcTimestampMills(long epochMills, ZoneId shiftTimeZone) { + if (UTC_ZONE_ID.equals(shiftTimeZone) || Long.MAX_VALUE == epochMills) { Review comment: Add comment to explain the special logic. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/UnionTest.scala ########## @@ -63,4 +110,31 @@ class UnionTest extends TableTestBase { util.verifyRelPlanWithType(sqlQuery) } + @Test + def testUnionDiffRowTime(): Unit = { + expectedException.expectMessage("Union fields with time attributes have different types") Review comment: nit: would be better to add more information, e.g. the time attributes are... ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java ########## @@ -52,13 +54,13 @@ public static boolean isTimePoint(LogicalType type) { } public static boolean isRowTime(LogicalType type) { - return type instanceof TimestampType - && ((TimestampType) type).getKind() == TimestampKind.ROWTIME; + return (type instanceof TimestampType || type instanceof LocalZonedTimestampType) + && isTimeAttribute(type); Review comment: Should be `isRowtimeAttribute`? ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala ########## @@ -676,7 +714,19 @@ class RexTimeIndicatorMaterializer( // extraction case FINAL if updatedCall.getOperands.size() == 1 && isMatchTimeIndicator(updatedCall.getOperands.get(0)) => - updatedCall + val rowtimeType = updatedCall.getOperands.get(0).getType + updatedCall.clone( + rowtimeType, + materializedOperands) + + case FlinkSqlOperatorTable.MATCH_ROWTIME if isTimeIndicatorType(updatedCall.getType) => + val rowtimeType = input.filter(isTimeIndicatorType).head + Review comment: remove empty line and add comments ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java ########## @@ -226,11 +225,10 @@ private void validateTimeColumn(String columnName, List<Column> columns) { } private void validateWatermarkExpression(LogicalType watermarkType) { - if (!hasRoot(watermarkType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) - || getPrecision(watermarkType) != 3) { + if (!supportedWatermarkType(watermarkType) || getPrecision(watermarkType) != 3) { Review comment: Please also remember to validate watermark output type should be the same with rowtime type. And please add tests for this. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java ########## @@ -109,15 +111,38 @@ public static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily famil } public static boolean isTimeAttribute(LogicalType logicalType) { - return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) != TimestampKind.REGULAR; + return hasFamily(logicalType, TIMESTAMP) Review comment: Just check type root here to be more strict. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala ########## @@ -364,6 +364,69 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState assertEquals(expected.sorted, sink.getAppendResults.sorted) } + @Test + def testWindowedGroupingAppliedToMatchRecognizeOnLtzRowtime(): Unit = { Review comment: How can we get deterministic results for every time zone for TIMESTAMP_LTZ rowtime? ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala ########## @@ -676,7 +714,19 @@ class RexTimeIndicatorMaterializer( // extraction case FINAL if updatedCall.getOperands.size() == 1 Review comment: update comments about `FINAL(MATCH_ROWTIME)`. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java ########## @@ -213,11 +213,10 @@ private void validateTimeColumn(String columnName, List<Column> columns) { columns.stream().map(Column::getName).collect(Collectors.toList()))); } final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType(); - if (!hasRoot(timeFieldType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) - || getPrecision(timeFieldType) != 3) { + if (!supportedWatermarkType(timeFieldType) || getPrecision(timeFieldType) != 3) { Review comment: `supportedWatermarkType` looks weird because time field of cause can't be watermark. I think you may want to use `canBeTimeAttributeType` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org