This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 5f666c3 [FLINK-22349][table-api] Throw Exception for unsupported Zone ID instead of using wrong value 5f666c3 is described below commit 5f666c3731e5c41e06e1cc759fb5c2eb550df6f0 Author: Leonard Xu <xbjt...@163.com> AuthorDate: Wed Apr 21 17:55:39 2021 +0800 [FLINK-22349][table-api] Throw Exception for unsupported Zone ID instead of using wrong value This closes #15678 --- .../generated/table_config_configuration.html | 2 +- .../org/apache/flink/table/api/TableConfig.java | 18 ++++++++++++ .../flink/table/api/config/TableConfigOptions.java | 2 +- .../apache/flink/table/api/TableConfigTest.java | 33 ++++++++++++++++++++++ .../table/util/python/PythonTableUtilsTest.scala | 10 +++---- .../window/assigners/TumblingWindowAssigner.java | 2 +- .../flink/table/runtime/util/TimeWindowUtil.java | 2 +- .../window/SlicingWindowAggOperatorTest.java | 4 +-- 8 files changed, 62 insertions(+), 11 deletions(-) diff --git a/docs/layouts/shortcodes/generated/table_config_configuration.html b/docs/layouts/shortcodes/generated/table_config_configuration.html index 7d9b938..ff14b8b 100644 --- a/docs/layouts/shortcodes/generated/table_config_configuration.html +++ b/docs/layouts/shortcodes/generated/table_config_configuration.html @@ -30,7 +30,7 @@ <td><h5>table.local-time-zone</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">"default"</td> <td>String</td> - <td>The local time zone defines current session time zone id. It is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>. Internally, timestamps with local time zone are always represented in the UTC time zone. However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), the session time zone is used during conversion. The input of option is either an abbreviation such as "PST", a full name suc [...] + <td>The local time zone defines current session time zone id. It is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>. Internally, timestamps with local time zone are always represented in the UTC time zone. However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), the session time zone is used during conversion. The input of option is either a full name such as "America/Los_Angeles", or [...] </tr> <tr> <td><h5>table.planner</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java index 83b10bf..2e0ec26 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java @@ -34,6 +34,8 @@ import java.time.ZoneId; import java.util.HashMap; import java.util.Map; +import static java.time.ZoneId.SHORT_IDS; + /** * Configuration for the current {@link TableEnvironment} session to adjust Table & SQL API * programs. @@ -113,6 +115,7 @@ public class TableConfig { */ public ZoneId getLocalTimeZone() { String zone = configuration.getString(TableConfigOptions.LOCAL_TIME_ZONE); + validateTimeZone(zone); return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) ? ZoneId.systemDefault() : ZoneId.of(zone); @@ -166,9 +169,24 @@ public class TableConfig { * @see org.apache.flink.table.types.logical.LocalZonedTimestampType */ public void setLocalTimeZone(ZoneId zoneId) { + validateTimeZone(zoneId.toString()); configuration.setString(TableConfigOptions.LOCAL_TIME_ZONE, zoneId.toString()); } + /** Validates user configured time zone. */ + private void validateTimeZone(String zone) { + final String zoneId = zone.toUpperCase(); + if (zoneId.startsWith("UTC+") + || zoneId.startsWith("UTC-") + || SHORT_IDS.containsKey(zoneId)) { + throw new IllegalArgumentException( + String.format( + "The supported Zone ID is either a full name such as 'America/Los_Angeles'," + + " or a custom timezone id such as 'GMT-8:00', but configured Zone ID is '%s'.", + zone)); + } + } + /** Returns the NULL check. If enabled, all fields need to be checked for NULL first. */ public Boolean getNullCheck() { return nullCheck; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java index 59ad2d3..9485d8d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java @@ -89,7 +89,7 @@ public class TableConfigOptions { "The local time zone defines current session time zone id. It is used when converting to/from " + "<code>TIMESTAMP WITH LOCAL TIME ZONE</code>. Internally, timestamps with local time zone are always represented in the UTC time zone. " + "However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, TIME, or simply STRING), " - + "the session time zone is used during conversion. The input of option is either an abbreviation such as \"PST\", a full name " + + "the session time zone is used during conversion. The input of option is either a full name " + "such as \"America/Los_Angeles\", or a custom timezone id such as \"GMT-8:00\"."); @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java index 6ad45dc..6b22dee 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java @@ -20,7 +20,9 @@ package org.apache.flink.table.api; import org.apache.flink.configuration.Configuration; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.time.Duration; import java.time.ZoneId; @@ -29,6 +31,9 @@ import static org.junit.Assert.assertEquals; /** Tests for {@link TableConfig}. */ public class TableConfigTest { + + @Rule public ExpectedException expectedException = ExpectedException.none(); + private static TableConfig configByMethod = new TableConfig(); private static TableConfig configByConfiguration = new TableConfig(); private static Configuration configuration = new Configuration(); @@ -64,6 +69,34 @@ public class TableConfigTest { } @Test + public void testSetInvalidLocalTimeZone() { + expectedException.expectMessage( + "The supported Zone ID is either a full name such as 'America/Los_Angeles'," + + " or a custom timezone id such as 'GMT-8:00', but configured Zone ID is 'UTC-10:00'."); + configByMethod.setLocalTimeZone(ZoneId.of("UTC-10:00")); + } + + @Test + public void testGetInvalidLocalTimeZone() { + configuration.setString("table.local-time-zone", "UTC+8"); + configByConfiguration.addConfiguration(configuration); + expectedException.expectMessage( + "The supported Zone ID is either a full name such as 'America/Los_Angeles'," + + " or a custom timezone id such as 'GMT-8:00', but configured Zone ID is 'UTC+8'."); + configByConfiguration.getLocalTimeZone(); + } + + @Test + public void testGetInvalidAbbreviationLocalTimeZone() { + configuration.setString("table.local-time-zone", "PST"); + configByConfiguration.addConfiguration(configuration); + expectedException.expectMessage( + "The supported Zone ID is either a full name such as 'America/Los_Angeles'," + + " or a custom timezone id such as 'GMT-8:00', but configured Zone ID is 'PST'."); + configByConfiguration.getLocalTimeZone(); + } + + @Test public void testSetAndGetIdleStateRetention() { configuration.setString("table.exec.state.ttl", "1 h"); configByConfiguration.addConfiguration(configuration); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/util/python/PythonTableUtilsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/util/python/PythonTableUtilsTest.scala index 822e748..2c1d473 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/util/python/PythonTableUtilsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/util/python/PythonTableUtilsTest.scala @@ -36,18 +36,18 @@ class PythonTableUtilsTest { val originalZone = TimeZone.getDefault try { // Daylight Saving Time Test - TimeZone.setDefault(TimeZone.getTimeZone(ZoneId.of("PST", ZoneId.SHORT_IDS))) + TimeZone.setDefault(TimeZone.getTimeZone(ZoneId.of("America/Los_Angeles"))) - // 2018-03-11 01:59:59.0 PST + // 2018-03-11 01:59:59.0 America/Los_Angeles testOffset(DateTimeUtils.timestampStringToUnixDate("2018-03-11 01:59:59.0"), -28800000) - // 2018-03-11 03:00:00.0 PST + // 2018-03-11 03:00:00.0 America/Los_Angeles testOffset(DateTimeUtils.timestampStringToUnixDate("2018-03-11 03:00:00.0"), -25200000) - // 2018-11-04 00:59:59.0 PST + // 2018-11-04 00:59:59.0 America/Los_Angeles testOffset(DateTimeUtils.timestampStringToUnixDate("2018-11-04 00:59:59.0"), -25200000) - // 2018-11-04 02:00:00.0 PST + // 2018-11-04 02:00:00.0 America/Los_Angeles testOffset(DateTimeUtils.timestampStringToUnixDate("2018-11-04 02:00:00.0"), -28800000) } finally { TimeZone.setDefault(originalZone) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/assigners/TumblingWindowAssigner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/assigners/TumblingWindowAssigner.java index afd532a..9925688 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/assigners/TumblingWindowAssigner.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/assigners/TumblingWindowAssigner.java @@ -100,7 +100,7 @@ public class TumblingWindowAssigner extends WindowAssigner<TimeWindow> * windows start at 0:15:00,1:15:00,2:15:00,etc. * * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, such as - * China which is using UTC+08:00,and you want a time window with size of one day, and window + * China which is using GMT+08:00,and you want a time window with size of one day, and window * begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than * UTC time. diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java index 88db051..0fd2094 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java @@ -43,7 +43,7 @@ public class TimeWindowUtil { /** * Convert a epoch mills to timestamp mills which can describe a locate date time. * - * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 1970-01-01 08:00:05, the + * <p>For example: The timestamp string of epoch mills 5 in GMT+8:00 is 1970-01-01 08:00:05, the * timestamp mills is 8 * 60 * 60 * 1000 + 5. * * @param epochMills the epoch mills. diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java index e0db37c..483979c 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java @@ -692,8 +692,8 @@ public class SlicingWindowAggOperatorTest { final SliceAssigner assigner = SliceAssigners.tumbling(-1, shiftTimeZone, Duration.ofHours(5)); - // the assigned windows should like as following, e.g. the given timeZone is UTC+8: - // local windows(timestamp in UTC+8) <=> epoch windows(timestamp in UTC+0) + // the assigned windows should like as following, e.g. the given timeZone is GMT+8:00: + // local windows(timestamp in GMT+8:00) <=> epoch windows(timestamp in UTC) // [1970-01-01 00:00, 1970-01-01 05:00] <=> [1969-12-31 16:00, 1969-12-31 21:00] // [1970-01-01 05:00, 1970-01-01 10:00] <=> [1969-12-31 21:00, 1970-01-01 02:00]