This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 740ad3d  [FLINK-22356][hive][filesystem] Fix partition-time commit 
failure when watermark is applied defined TIMESTAMP_LTZ column
740ad3d is described below

commit 740ad3df3ff6a33cb33223a71e174f989b5b04aa
Author: Leonard Xu <xbjt...@gmail.com>
AuthorDate: Sat Apr 24 00:26:10 2021 +0800

    [FLINK-22356][hive][filesystem] Fix partition-time commit failure when 
watermark is applied defined TIMESTAMP_LTZ column
    
    This closes #15709
---
 .../content.zh/docs/connectors/table/filesystem.md |  50 ++++++++-
 .../docs/connectors/table/hive/hive_read_write.md  |  37 ++++++-
 docs/content/docs/connectors/table/filesystem.md   |  48 +++++++-
 .../docs/connectors/table/hive/hive_read_write.md  |  37 ++++++-
 .../flink/connectors/hive/HiveTableSinkITCase.java | 121 +++++++++++++++++----
 .../flink/table/filesystem/FileSystemOptions.java  |  13 +++
 .../table/filesystem/FileSystemTableFactory.java   |  17 +++
 .../stream/PartitionTimeCommitTrigger.java         |  23 +++-
 .../filesystem/FileSystemTableFactoryTest.java     |  27 +++++
 9 files changed, 339 insertions(+), 34 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/filesystem.md 
b/docs/content.zh/docs/connectors/table/filesystem.md
index cfee0bd..b3dfeb2 100644
--- a/docs/content.zh/docs/connectors/table/filesystem.md
+++ b/docs/content.zh/docs/connectors/table/filesystem.md
@@ -225,6 +225,12 @@ To define when to commit a partition, providing partition 
commit trigger:
         <td>Duration</td>
         <td>The partition will not commit until the delay time. If it is a 
daily partition, should be '1 d', if it is a hourly partition, should be '1 
h'.</td>
     </tr>
+    <tr>
+        <td><h5>sink.partition-commit.watermark-time-zone</h5></td>
+        <td style="word-wrap: break-word;">UTC</td>
+        <td>String</td>
+        <td>The time zone to parse the long watermark value to TIMESTAMP 
value, the parsed watermark timestamp is used to compare with partition time to 
decide the partition should commit or not. This option is only take effect when 
`sink.partition-commit.trigger` is set to 'partition-time'. If this option is 
not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ 
column, but this config is not configured, then users may see the partition 
committed after a few hours. Th [...]
+    </tr>        
   </tbody>
 </table>
 
@@ -401,7 +407,7 @@ The parallelism of writing files into external file system 
(including Hive) can
 
 ## Full Example
 
-The below shows how the file system connector can be used to write a streaming 
query to write data from Kafka into a file system and runs a batch query to 
read that data back out.
+The below examples show how the file system connector can be used to write a 
streaming query to write data from Kafka into a file system and runs a batch 
query to read that data back out.
 
 ```sql
 
@@ -409,7 +415,7 @@ CREATE TABLE kafka_table (
   user_id STRING,
   order_amount DOUBLE,
   log_ts TIMESTAMP(3),
-  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
+  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP column
 ) WITH (...);
 
 CREATE TABLE fs_table (
@@ -438,4 +444,44 @@ FROM kafka_table;
 SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
 ```
 
+If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` 
to commit, the `sink.partition-commit.watermark-time-zone` is required to set 
to the session time zone, otherwise the partition committed may happen after a 
few hours.  
+```sql
+
+CREATE TABLE kafka_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  ts BIGINT, -- time in epoch milliseconds
+  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
+  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP_LTZ column
+) WITH (...);
+
+CREATE TABLE fs_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  dt STRING,
+  `hour` STRING
+) PARTITIONED BY (dt, `hour`) WITH (
+  'connector'='filesystem',
+  'path'='...',
+  'format'='parquet',
+  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
+  'sink.partition-commit.delay'='1 h',
+  'sink.partition-commit.trigger'='partition-time',
+  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user 
configured time zone is 'Asia/Shanghai'
+  'sink.partition-commit.policy.kind'='success-file'
+);
+
+-- streaming sql, insert into file system table
+INSERT INTO fs_table 
+SELECT 
+    user_id, 
+    order_amount, 
+    DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
+    DATE_FORMAT(ts_ltz, 'HH') 
+FROM kafka_table;
+
+-- batch sql, select with partition pruning
+SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
+```
+
 {{< top >}}
diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md 
b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index d0efdeb..6d45f8d 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -346,7 +346,7 @@ Flink SQL> INSERT OVERWRITE myparttable PARTITION 
(my_type='type_1') SELECT 'Tom
 visible - incrementally. Users control when/how to trigger commits with 
several properties. Insert
 overwrite is not supported for streaming write.
 
-The below shows how the streaming sink can be used to write a streaming query 
to write data from Kafka into a Hive table with partition-commit,
+The below examples show how the streaming sink can be used to write a 
streaming query to write data from Kafka into a Hive table with 
partition-commit,
 and runs a batch query to read that data back out. 
 
 Please see the [streaming sink]({{< ref "docs/connectors/table/filesystem" 
>}}#streaming-sink) for a full list of available configurations.
@@ -369,7 +369,7 @@ CREATE TABLE kafka_table (
   user_id STRING,
   order_amount DOUBLE,
   log_ts TIMESTAMP(3),
-  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
+  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP column
 ) WITH (...);
 
 -- streaming sql, insert into hive table
@@ -382,6 +382,39 @@ SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
 
 ```
 
+If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` 
to commit, the `sink.partition-commit.watermark-time-zone` is required to set 
to the session time zone, otherwise the partition committed may happen after a 
few hours.  
+```sql
+
+SET table.sql-dialect=hive;
+CREATE TABLE hive_table (
+  user_id STRING,
+  order_amount DOUBLE
+) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
+  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
+  'sink.partition-commit.trigger'='partition-time',
+  'sink.partition-commit.delay'='1 h',
+  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user 
configured time zone is 'Asia/Shanghai'
+  'sink.partition-commit.policy.kind'='metastore,success-file'
+);
+
+SET table.sql-dialect=default;
+CREATE TABLE kafka_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  ts BIGINT, -- time in epoch milliseconds
+  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
+  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP_LTZ column
+) WITH (...);
+
+-- streaming sql, insert into hive table
+INSERT INTO TABLE hive_table 
+SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), 
DATE_FORMAT(ts_ltz, 'HH')
+FROM kafka_table;
+
+-- batch sql, select with partition pruning
+SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
+
+```
 
 By default, for streaming writes, Flink only supports renaming committers, 
meaning the S3 filesystem
 cannot support exactly-once streaming writes.
diff --git a/docs/content/docs/connectors/table/filesystem.md 
b/docs/content/docs/connectors/table/filesystem.md
index e9a1d29..d9c53b4 100644
--- a/docs/content/docs/connectors/table/filesystem.md
+++ b/docs/content/docs/connectors/table/filesystem.md
@@ -225,6 +225,12 @@ To define when to commit a partition, providing partition 
commit trigger:
         <td>Duration</td>
         <td>The partition will not commit until the delay time. If it is a 
daily partition, should be '1 d', if it is a hourly partition, should be '1 
h'.</td>
     </tr>
+    <tr>
+        <td><h5>sink.partition-commit.watermark-time-zone</h5></td>
+        <td style="word-wrap: break-word;">UTC</td>
+        <td>String</td>
+        <td>The time zone to parse the long watermark value to TIMESTAMP 
value, the parsed watermark timestamp is used to compare with partition time to 
decide the partition should commit or not. This option is only take effect when 
`sink.partition-commit.trigger` is set to 'partition-time'. If this option is 
not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ 
column, but this config is not configured, then users may see the partition 
committed after a few hours. Th [...]
+    </tr>    
   </tbody>
 </table>
 
@@ -401,7 +407,7 @@ The parallelism of writing files into external file system 
(including Hive) can
 
 ## Full Example
 
-The below shows how the file system connector can be used to write a streaming 
query to write data from Kafka into a file system and runs a batch query to 
read that data back out.
+The below examples show how the file system connector can be used to write a 
streaming query to write data from Kafka into a file system and runs a batch 
query to read that data back out.
 
 ```sql
 
@@ -438,4 +444,44 @@ FROM kafka_table;
 SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
 ```
 
+If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` 
to commit, the `sink.partition-commit.watermark-time-zone` is required to set 
to the session time zone, otherwise the partition committed may happen after a 
few hours.  
+```sql
+
+CREATE TABLE kafka_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  ts BIGINT, -- time in epoch milliseconds
+  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
+  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP_LTZ column
+) WITH (...);
+
+CREATE TABLE fs_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  dt STRING,
+  `hour` STRING
+) PARTITIONED BY (dt, `hour`) WITH (
+  'connector'='filesystem',
+  'path'='...',
+  'format'='parquet',
+  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
+  'sink.partition-commit.delay'='1 h',
+  'sink.partition-commit.trigger'='partition-time',
+  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user 
configured time zone is 'Asia/Shanghai'
+  'sink.partition-commit.policy.kind'='success-file'
+);
+
+-- streaming sql, insert into file system table
+INSERT INTO fs_table 
+SELECT 
+    user_id, 
+    order_amount, 
+    DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
+    DATE_FORMAT(ts_ltz, 'HH') 
+FROM kafka_table;
+
+-- batch sql, select with partition pruning
+SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
+```
+
 {{< top >}}
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md 
b/docs/content/docs/connectors/table/hive/hive_read_write.md
index d4b8407..ed44dfc 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -346,7 +346,7 @@ Flink SQL> INSERT OVERWRITE myparttable PARTITION 
(my_type='type_1') SELECT 'Tom
 visible - incrementally. Users control when/how to trigger commits with 
several properties. Insert
 overwrite is not supported for streaming write.
 
-The below shows how the streaming sink can be used to write a streaming query 
to write data from Kafka into a Hive table with partition-commit,
+The below examples show how the streaming sink can be used to write a 
streaming query to write data from Kafka into a Hive table with 
partition-commit,
 and runs a batch query to read that data back out. 
 
 Please see the [streaming sink]({{< ref "docs/connectors/table/filesystem" 
>}}#streaming-sink) for a full list of available configurations.
@@ -369,7 +369,7 @@ CREATE TABLE kafka_table (
   user_id STRING,
   order_amount DOUBLE,
   log_ts TIMESTAMP(3),
-  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
+  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP column
 ) WITH (...);
 
 -- streaming sql, insert into hive table
@@ -382,6 +382,39 @@ SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
 
 ```
 
+If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` 
to commit, the `sink.partition-commit.watermark-time-zone` is required to set 
to the session time zone, otherwise the partition committed may happen after a 
few hours.  
+```sql
+
+SET table.sql-dialect=hive;
+CREATE TABLE hive_table (
+  user_id STRING,
+  order_amount DOUBLE
+) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
+  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
+  'sink.partition-commit.trigger'='partition-time',
+  'sink.partition-commit.delay'='1 h',
+  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user 
configured time zone is 'Asia/Shanghai'
+  'sink.partition-commit.policy.kind'='metastore,success-file'
+);
+
+SET table.sql-dialect=default;
+CREATE TABLE kafka_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  ts BIGINT, -- time in epoch milliseconds
+  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
+  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP_LTZ column
+) WITH (...);
+
+-- streaming sql, insert into hive table
+INSERT INTO TABLE hive_table 
+SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), 
DATE_FORMAT(ts_ltz, 'HH')
+FROM kafka_table;
+
+-- batch sql, select with partition pruning
+SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
+
+```
 
 By default, for streaming writes, Flink only supports renaming committers, 
meaning the S3 filesystem
 cannot support exactly-once streaming writes.
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index e723172..410ebe9 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
@@ -41,6 +42,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -223,6 +225,101 @@ public class HiveTableSinkITCase {
                 });
     }
 
+    @Test(timeout = 120000)
+    public void testStreamingSinkWithTimestampLtzWatermark() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
+
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        try {
+            tEnv.executeSql("create database db1");
+            tEnv.useDatabase("db1");
+            tEnv.executeSql(
+                    "create external table sink_table ("
+                            + " a int,"
+                            + " b string,"
+                            + " c string)"
+                            + " partitioned by (d string,e string)"
+                            + " stored as parquet TBLPROPERTIES ("
+                            + " 
'partition.time-extractor.timestamp-pattern'='$d $e:00:00',"
+                            + " 
'sink.partition-commit.trigger'='partition-time',"
+                            + " 'sink.partition-commit.delay'='1h',"
+                            + " 
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai',"
+                            + " 
'sink.partition-commit.policy.kind'='metastore,success-file',"
+                            + " 
'sink.partition-commit.success-file.name'='_MY_SUCCESS')");
+
+            // hive dialect only works with hive tables at the moment, switch 
to default dialect
+            tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+            tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+            // prepare source
+            // epoch mills 1588460400000L <=>  local timestamp 2020-05-03 
07:00:00 in Shanghai
+            // epoch mills 1588464000000L <=>  local timestamp 2020-05-03 
08:00:00 in Shanghai
+            // epoch mills 1588467600000L <=>  local timestamp 2020-05-03 
09:00:00 in Shanghai
+            // epoch mills 1588471200000L <=>  local timestamp 2020-05-03 
10:00:00 in Shanghai
+            // epoch mills 1588474800000L <=>  local timestamp 2020-05-03 
11:00:00 in Shanghai
+            List<Row> data =
+                    Arrays.asList(
+                            Row.of(1, "a", "b", "2020-05-03", "7", 
1588460400000L),
+                            Row.of(1, "a", "b", "2020-05-03", "7", 
1588460400000L),
+                            Row.of(2, "p", "q", "2020-05-03", "8", 
1588464000000L),
+                            Row.of(2, "p", "q", "2020-05-03", "8", 
1588464000000L),
+                            Row.of(3, "x", "y", "2020-05-03", "9", 
1588467600000L),
+                            Row.of(3, "x", "y", "2020-05-03", "9", 
1588467600000L),
+                            Row.of(4, "x", "y", "2020-05-03", "10", 
1588471200000L),
+                            Row.of(4, "x", "y", "2020-05-03", "10", 
1588471200000L),
+                            Row.of(5, "x", "y", "2020-05-03", "11", 
1588474800000L),
+                            Row.of(5, "x", "y", "2020-05-03", "11", 
1588474800000L));
+
+            String dataId = TestValuesTableFactory.registerData(data);
+            String sourceTableDDL =
+                    String.format(
+                            "create table my_table("
+                                    + " a INT,"
+                                    + " b STRING,"
+                                    + " c STRING,"
+                                    + " d STRING,"
+                                    + " e STRING,"
+                                    + " ts BIGINT,"
+                                    + " ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),"
+                                    + " WATERMARK FOR ts_ltz as ts_ltz"
+                                    + ") with ("
+                                    + " 'connector' = 'values',"
+                                    + " 'data-id' = '%s',"
+                                    + " 'failing-source' = 'true')",
+                            dataId);
+            tEnv.executeSql(sourceTableDDL);
+            tEnv.executeSql("insert into sink_table select a, b, c, d, e from 
my_table").await();
+
+            assertBatch(
+                    "db1.sink_table",
+                    Arrays.asList(
+                            "+I[1, a, b, 2020-05-03, 7]",
+                            "+I[1, a, b, 2020-05-03, 7]",
+                            "+I[2, p, q, 2020-05-03, 8]",
+                            "+I[2, p, q, 2020-05-03, 8]",
+                            "+I[3, x, y, 2020-05-03, 9]",
+                            "+I[3, x, y, 2020-05-03, 9]",
+                            "+I[4, x, y, 2020-05-03, 10]",
+                            "+I[4, x, y, 2020-05-03, 10]",
+                            "+I[5, x, y, 2020-05-03, 11]",
+                            "+I[5, x, y, 2020-05-03, 11]"));
+
+            this.checkSuccessFiles(
+                    URI.create(
+                                    hiveCatalog
+                                            
.getHiveTable(ObjectPath.fromString("db1.sink_table"))
+                                            .getSd()
+                                            .getLocation())
+                            .getPath());
+        } finally {
+            tEnv.executeSql("drop database db1 cascade");
+        }
+    }
+
     private void checkSuccessFiles(String path) {
         File basePath = new File(path, "d=2020-05-03");
         Assert.assertEquals(5, basePath.list().length);
@@ -318,30 +415,6 @@ public class HiveTableSinkITCase {
                             "+I[5, x, y, 2020-05-03, 11]",
                             "+I[5, x, y, 2020-05-03, 11]"));
 
-            // using batch table env to query.
-            List<String> results = new ArrayList<>();
-            TableEnvironment batchTEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-            batchTEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
-            batchTEnv.useCatalog(hiveCatalog.getName());
-            batchTEnv
-                    .executeSql("select * from db1.sink_table")
-                    .collect()
-                    .forEachRemaining(r -> results.add(r.toString()));
-            results.sort(String::compareTo);
-            Assert.assertEquals(
-                    Arrays.asList(
-                            "+I[1, a, b, 2020-05-03, 7]",
-                            "+I[1, a, b, 2020-05-03, 7]",
-                            "+I[2, p, q, 2020-05-03, 8]",
-                            "+I[2, p, q, 2020-05-03, 8]",
-                            "+I[3, x, y, 2020-05-03, 9]",
-                            "+I[3, x, y, 2020-05-03, 9]",
-                            "+I[4, x, y, 2020-05-03, 10]",
-                            "+I[4, x, y, 2020-05-03, 10]",
-                            "+I[5, x, y, 2020-05-03, 11]",
-                            "+I[5, x, y, 2020-05-03, 11]"),
-                    results);
-
             pathConsumer.accept(
                     URI.create(
                                     hiveCatalog
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
index 7344dad..0f985d6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
@@ -190,6 +190,19 @@ public class FileSystemOptions {
                                     + " if it is a day partition, should be '1 
d',"
                                     + " if it is a hour partition, should be 
'1 h'");
 
+    public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE =
+            key("sink.partition-commit.watermark-time-zone")
+                    .stringType()
+                    .defaultValue("UTC")
+                    .withDescription(
+                            "The time zone to parse the long watermark value 
to TIMESTAMP value,"
+                                    + " the parsed watermark timestamp is used 
to compare with partition time"
+                                    + " to decide the partition should commit 
or not."
+                                    + " The default value is 'UTC', which 
means the watermark is defined on TIMESTAMP column or not defined."
+                                    + " If the watermark is defined on 
TIMESTAMP_LTZ column, the time zone of watermark is user configured time zone,"
+                                    + " the the value should be the user 
configured local time zone. The option value is either a full name"
+                                    + " such as 'America/Los_Angeles', or a 
custom timezone id such as 'GMT-8:00'.");
+
     public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND 
=
             key("sink.partition-commit.policy.kind")
                     .stringType()
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
index 2039247..49eb235 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
@@ -45,6 +45,8 @@ import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static java.time.ZoneId.SHORT_IDS;
+
 /**
  * File system {@link TableFactory}.
  *
@@ -106,6 +108,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
         
options.add(FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN);
         options.add(FileSystemOptions.SINK_PARTITION_COMMIT_TRIGGER);
         options.add(FileSystemOptions.SINK_PARTITION_COMMIT_DELAY);
+        
options.add(FileSystemOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE);
         options.add(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
         options.add(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_CLASS);
         options.add(FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME);
@@ -119,6 +122,20 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
         // Except format options, some formats like parquet and orc can not 
list all supported
         // options.
         helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + 
".");
+
+        // validate time zone of watermark
+        String watermarkTimeZone =
+                helper.getOptions()
+                        
.get(FileSystemOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE);
+        if (watermarkTimeZone.startsWith("UTC+")
+                || watermarkTimeZone.startsWith("UTC-")
+                || SHORT_IDS.containsKey(watermarkTimeZone)) {
+            throw new ValidationException(
+                    String.format(
+                            "The supported watermark time zone is either a 
full name such as 'America/Los_Angeles',"
+                                    + " or a custom time zone id such as 
'GMT-8:00', but configured time zone is '%s'.",
+                            watermarkTimeZone));
+        }
     }
 
     private <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> 
discoverDecodingFormat(
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionTimeCommitTrigger.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionTimeCommitTrigger.java
index 035b94b..0c53372 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionTimeCommitTrigger.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/PartitionTimeCommitTrigger.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.filesystem.PartitionTimeExtractor;
 import org.apache.flink.util.StringUtils;
 
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,11 +41,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
-import static 
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_DELAY;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE;
 import static 
org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues;
 
 /**
@@ -73,6 +74,8 @@ public class PartitionTimeCommitTrigger implements 
PartitionCommitTrigger {
     private final PartitionTimeExtractor extractor;
     private final long commitDelay;
     private final List<String> partitionKeys;
+    /** The time zone used to parse the long watermark value to TIMESTAMP. */
+    private final ZoneId watermarkTimeZone;
 
     public PartitionTimeCommitTrigger(
             boolean isRestored,
@@ -98,6 +101,8 @@ public class PartitionTimeCommitTrigger implements 
PartitionCommitTrigger {
 
         this.watermarksState = stateStore.getListState(WATERMARKS_STATE_DESC);
         this.watermarks = new TreeMap<>();
+        this.watermarkTimeZone =
+                
ZoneId.of(conf.getString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE));
         if (isRestored) {
             watermarks.putAll(watermarksState.get().iterator().next());
         }
@@ -126,9 +131,9 @@ public class PartitionTimeCommitTrigger implements 
PartitionCommitTrigger {
         Iterator<String> iter = pendingPartitions.iterator();
         while (iter.hasNext()) {
             String partition = iter.next();
-            LocalDateTime partTime =
+            LocalDateTime partitionTime =
                     extractor.extract(partitionKeys, 
extractPartitionValues(new Path(partition)));
-            if (watermark > toMills(partTime) + commitDelay) {
+            if (watermarkHasPassedWithDelay(watermark, partitionTime, 
commitDelay)) {
                 needCommit.add(partition);
                 iter.remove();
             }
@@ -136,6 +141,18 @@ public class PartitionTimeCommitTrigger implements 
PartitionCommitTrigger {
         return needCommit;
     }
 
+    /**
+     * Returns the watermark has passed the partition time or not, if true 
means it's time to commit
+     * the partition.
+     */
+    private boolean watermarkHasPassedWithDelay(
+            long watermark, LocalDateTime partitionTime, long commitDelay) {
+        // here we don't parse the long watermark to TIMESTAMP and then 
comparision,
+        // but parse the partition timestamp to epoch mills to avoid Daylight 
Saving Time issue
+        long epochPartTime = 
partitionTime.atZone(watermarkTimeZone).toInstant().toEpochMilli();
+        return watermark > epochPartTime + commitDelay;
+    }
+
     @Override
     public void snapshotState(long checkpointId, long watermark) throws 
Exception {
         pendingPartitionsState.clear();
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableFactoryTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableFactoryTest.java
index 0df05c2..9bc33a9 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableFactoryTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableFactoryTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -147,6 +148,32 @@ public class FileSystemTableFactoryTest {
     }
 
     @Test
+    public void testUnsupportedWatermarkTimeZone() {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "csv");
+        descriptor.putString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), 
"UTC+8");
+
+        try {
+            createTableSource(SCHEMA, descriptor.asMap());
+        } catch (ValidationException e) {
+            Throwable cause = e.getCause();
+            assertTrue(cause.toString(), cause instanceof ValidationException);
+            assertTrue(
+                    cause.getMessage(),
+                    cause.getMessage()
+                            .contains(
+                                    "The supported watermark time zone is 
either a full name such "
+                                            + "as 'America/Los_Angeles', or a 
custom time zone id such "
+                                            + "as 'GMT-8:00', but configured 
time zone is 'UTC+8'."));
+            return;
+        }
+
+        fail("Should fail by ValidationException.");
+    }
+
+    @Test
     public void testNoFormatFactoryFound() {
         DescriptorProperties descriptor = new DescriptorProperties();
         descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");

Reply via email to