[flink] branch master updated: [FLINK-22444][docs] Drop async checkpoint description of state backends in Chinese docs

2021-04-25 Thread tangyun
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f9d3245  [FLINK-22444][docs] Drop async checkpoint description of 
state backends in Chinese docs
f9d3245 is described below

commit f9d3245ade6f0ffb1be633148bd97c4c2aabc2a2
Author: Yun Tang 
AuthorDate: Sun Apr 25 10:50:50 2021 +0800

[FLINK-22444][docs] Drop async checkpoint description of state backends in 
Chinese docs
---
 docs/content.zh/docs/ops/state/state_backends.md | 16 
 1 file changed, 16 deletions(-)

diff --git a/docs/content.zh/docs/ops/state/state_backends.md 
b/docs/content.zh/docs/ops/state/state_backends.md
index e978abf..d1343f6 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -55,13 +55,6 @@ Flink 内置了以下这些开箱即用的 state backends :
 
 在 CheckPoint 时,State Backend 对状态进行快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 
JobManager(master),同时 JobManager 也将快照信息存储在堆内存中。
 
-MemoryStateBackend 能配置异步快照。强烈建议使用异步快照来防止数据流阻塞,注意,异步快照默认是开启的。
-用户可以在实例化 `MemoryStateBackend` 的时候,将相应布尔类型的构造参数设置为 `false` 来关闭异步快照(仅在 debug 
的时候使用),例如:
-
-```java
-new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
-```
-
 MemoryStateBackend 的限制:
 
   - 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。
@@ -82,13 +75,6 @@ MemoryStateBackend 适用场景:
 FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中。
 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其写入到 CheckPoint 的元数据文件中)。
 
-FsStateBackend 默认使用异步快照来防止 CheckPoint 写状态时对数据处理造成阻塞。
-用户可以在实例化 `FsStateBackend` 的时候,将相应布尔类型的构造参数设置为 `false` 来关闭异步快照,例如:
-
-```java
-new FsStateBackend(path, false);
-```
-
 FsStateBackend 适用场景:
 
   - 状态比较大、窗口比较长、key/value 状态比较大的 Job。
@@ -108,8 +94,6 @@ Unlike storing java objects in `HashMapStateBackend`, data 
is stored as serializ
 CheckPoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中。
 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其存储到 CheckPoint 的元数据文件中)。 
 
-RocksDBStateBackend 只支持异步快照。
-
 RocksDBStateBackend 的限制:
 
   - 由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。


[flink] branch master updated (075b0b2 -> 664b29d)

2021-04-25 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 075b0b2  [FLINK-19449][table-planner] LEAD/LAG cannot work correctly 
in streaming mode
 add 664b29d  [FLINK-18199][doc] translate FileSystem SQL Connector page 
into chinese

No new revisions were added by this update.

Summary of changes:
 .../content.zh/docs/connectors/table/filesystem.md | 234 ++---
 1 file changed, 115 insertions(+), 119 deletions(-)


[flink-statefun] branch release-3.0 updated (1ae409e -> 71e8de7)

2021-04-25 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-3.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


from 1ae409e  [hotfix] Add missing configuration to documentation
 new 1b98248  [hotfix][docs] Fix typos in apache-kafka.md and aws-kinesis.md
 new 71e8de7  [hotfix][docs] Fix typos in concepts

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content/docs/concepts/application-building-blocks.md | 6 +++---
 docs/content/docs/concepts/distributed_architecture.md| 4 ++--
 docs/content/docs/concepts/logical.md | 6 +++---
 docs/content/docs/io-module/apache-kafka.md   | 2 +-
 docs/content/docs/io-module/aws-kinesis.md| 2 +-
 5 files changed, 10 insertions(+), 10 deletions(-)


[flink-statefun] 01/02: [hotfix][docs] Fix typos in apache-kafka.md and aws-kinesis.md

2021-04-25 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-3.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1b9824831c06acc03d7995d875e9862b2cb57f9b
Author: ariskk 
AuthorDate: Sat Apr 24 11:04:14 2021 +0100

[hotfix][docs] Fix typos in apache-kafka.md and aws-kinesis.md
---
 docs/content/docs/io-module/apache-kafka.md | 2 +-
 docs/content/docs/io-module/aws-kinesis.md  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/io-module/apache-kafka.md 
b/docs/content/docs/io-module/apache-kafka.md
index a700f17..e0b4653 100644
--- a/docs/content/docs/io-module/apache-kafka.md
+++ b/docs/content/docs/io-module/apache-kafka.md
@@ -75,7 +75,7 @@ startupPosition:
   type: group-offsets
 ```
 
- Earlist
+ Earliest
 
 Starts from the earliest offset.
 
diff --git a/docs/content/docs/io-module/aws-kinesis.md 
b/docs/content/docs/io-module/aws-kinesis.md
index c288d44..c8947c1 100644
--- a/docs/content/docs/io-module/aws-kinesis.md
+++ b/docs/content/docs/io-module/aws-kinesis.md
@@ -82,7 +82,7 @@ startupPosition:
   type: latest
 ```
 
- Earlist
+ Earliest
 
 Start consuming from the earliest position possible.
 


[flink-statefun] 02/02: [hotfix][docs] Fix typos in concepts

2021-04-25 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-3.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 71e8de7f7046cb7b65b533f1f336bdeeebeb475b
Author: ariskk 
AuthorDate: Sat Apr 24 11:31:08 2021 +0100

[hotfix][docs] Fix typos in concepts

This closes #229.
---
 docs/content/docs/concepts/application-building-blocks.md | 6 +++---
 docs/content/docs/concepts/distributed_architecture.md| 4 ++--
 docs/content/docs/concepts/logical.md | 6 +++---
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/docs/content/docs/concepts/application-building-blocks.md 
b/docs/content/docs/concepts/application-building-blocks.md
index d82ca23..996a314 100644
--- a/docs/content/docs/concepts/application-building-blocks.md
+++ b/docs/content/docs/concepts/application-building-blocks.md
@@ -26,7 +26,7 @@ under the License.
 
 # Application Building Blocks
 
-Stateful Functions provides a framework for building event drivent 
applications. Here, we explain important aspects of Stateful Function’s 
architecture.
+Stateful Functions provides a framework for building event driven 
applications. Here, we explain important aspects of Stateful Function’s 
architecture.
 
 ## Event Ingress
 
@@ -36,7 +36,7 @@ Stateful Function applications sit squarely in the event 
driven space, so the na
 
 
 In Stateful Functions, the component that ingests records into the system is 
called an event ingress.
-This can be anything from a Kafka topic, to a messsage queue, to an http 
request - anything that can get data into the system and trigger the intitial 
functions to begin computation.
+This can be anything from a Kafka topic, to a message queue, to an http 
request - anything that can get data into the system and trigger the initial 
functions to begin computation.
 
 ## Stateful Functions
 
@@ -63,7 +63,7 @@ When inside a function, while it is performing some 
computation, you are always
 
 ### Fault Tolerance
 
-For both state and messaging, Stateful Functions is able to provide the 
exactly-once guarantees users expect from a modern data processessing framework.
+For both state and messaging, Stateful Functions is able to provide the 
exactly-once guarantees users expect from a modern data processing framework.
 
 {{< img width="80%" src="/fig/concepts/statefun-app-fault-tolerance.svg" >}}
 
diff --git a/docs/content/docs/concepts/distributed_architecture.md 
b/docs/content/docs/concepts/distributed_architecture.md
index 3c64181..bebdf99 100644
--- a/docs/content/docs/concepts/distributed_architecture.md
+++ b/docs/content/docs/concepts/distributed_architecture.md
@@ -47,7 +47,7 @@ In addition to the Apache Flink processes, a full deployment 
requires [ZooKeeper
 
 ## Logical Co-location, Physical Separation
 
-A core principle of many Stream Processors is that application logic and the 
application state must be co-located. That approach is the basis for their 
out-of-the box consistency. Stateful Functions takes a unique approach to that 
by *logically co-locating* state and compute, but allowing to *physically 
separate* them.
+A core principle of many Stream Processors is that application logic, and the 
application state must be co-located. That approach is the basis for their 
out-of-the box consistency. Stateful Functions takes a unique approach to that 
by *logically co-locating* state and compute, but allowing to *physically 
separate* them.
 
   - *Logical co-location:* Messaging, state access/updates and function 
invocations are managed tightly together, in the same way as in Flink's 
DataStream API. State is sharded by key, and messages are routed to the state 
by key. There is a single writer per key at a time, also scheduling the 
function invocations.
 
@@ -60,7 +60,7 @@ The stateful functions themselves can be deployed in various 
ways that trade off
 
  Remote Functions
 
-*Remote Functions* use the above-mentioned principle of *physical separation* 
while maintaining *logical co-location*. The state/messaging tier (i.e., the 
Flink processes) and the function tier are deployed, managed, and scaled 
independently.
+*Remote Functions* use the above-mentioned principle of *physical separation* 
while maintaining *logical co-location*. The state/messaging tier (i.e., the 
Flink processes), and the function tier are deployed, managed, and scaled 
independently.
 
 Function invocations happen through an HTTP / gRPC protocol and go through a 
service that routes invocation requests to any available endpoint, for example 
a Kubernetes (load-balancing) service, the AWS request gateway for Lambda, etc. 
Because invocations are self-contained (contain message, state, access to 
timers, etc.) the target functions can be treated like any stateless 
application.
 
diff --git a/docs/content/docs/concepts/logical.md 
b/docs/content/docs/concepts/logical.md
index 20f08f4..babf3f9 100644
--- a/docs

[flink-statefun] 01/01: [hotfix][docs] Fix typos in concepts

2021-04-25 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 898bb9b3a5c650b0e318f48577796be87107a83b
Author: ariskk 
AuthorDate: Sat Apr 24 11:31:08 2021 +0100

[hotfix][docs] Fix typos in concepts

This closes #229.
---
 docs/content/docs/concepts/application-building-blocks.md | 6 +++---
 docs/content/docs/concepts/distributed_architecture.md| 4 ++--
 docs/content/docs/concepts/logical.md | 6 +++---
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/docs/content/docs/concepts/application-building-blocks.md 
b/docs/content/docs/concepts/application-building-blocks.md
index d82ca23..996a314 100644
--- a/docs/content/docs/concepts/application-building-blocks.md
+++ b/docs/content/docs/concepts/application-building-blocks.md
@@ -26,7 +26,7 @@ under the License.
 
 # Application Building Blocks
 
-Stateful Functions provides a framework for building event drivent 
applications. Here, we explain important aspects of Stateful Function’s 
architecture.
+Stateful Functions provides a framework for building event driven 
applications. Here, we explain important aspects of Stateful Function’s 
architecture.
 
 ## Event Ingress
 
@@ -36,7 +36,7 @@ Stateful Function applications sit squarely in the event 
driven space, so the na
 
 
 In Stateful Functions, the component that ingests records into the system is 
called an event ingress.
-This can be anything from a Kafka topic, to a messsage queue, to an http 
request - anything that can get data into the system and trigger the intitial 
functions to begin computation.
+This can be anything from a Kafka topic, to a message queue, to an http 
request - anything that can get data into the system and trigger the initial 
functions to begin computation.
 
 ## Stateful Functions
 
@@ -63,7 +63,7 @@ When inside a function, while it is performing some 
computation, you are always
 
 ### Fault Tolerance
 
-For both state and messaging, Stateful Functions is able to provide the 
exactly-once guarantees users expect from a modern data processessing framework.
+For both state and messaging, Stateful Functions is able to provide the 
exactly-once guarantees users expect from a modern data processing framework.
 
 {{< img width="80%" src="/fig/concepts/statefun-app-fault-tolerance.svg" >}}
 
diff --git a/docs/content/docs/concepts/distributed_architecture.md 
b/docs/content/docs/concepts/distributed_architecture.md
index 3c64181..bebdf99 100644
--- a/docs/content/docs/concepts/distributed_architecture.md
+++ b/docs/content/docs/concepts/distributed_architecture.md
@@ -47,7 +47,7 @@ In addition to the Apache Flink processes, a full deployment 
requires [ZooKeeper
 
 ## Logical Co-location, Physical Separation
 
-A core principle of many Stream Processors is that application logic and the 
application state must be co-located. That approach is the basis for their 
out-of-the box consistency. Stateful Functions takes a unique approach to that 
by *logically co-locating* state and compute, but allowing to *physically 
separate* them.
+A core principle of many Stream Processors is that application logic, and the 
application state must be co-located. That approach is the basis for their 
out-of-the box consistency. Stateful Functions takes a unique approach to that 
by *logically co-locating* state and compute, but allowing to *physically 
separate* them.
 
   - *Logical co-location:* Messaging, state access/updates and function 
invocations are managed tightly together, in the same way as in Flink's 
DataStream API. State is sharded by key, and messages are routed to the state 
by key. There is a single writer per key at a time, also scheduling the 
function invocations.
 
@@ -60,7 +60,7 @@ The stateful functions themselves can be deployed in various 
ways that trade off
 
  Remote Functions
 
-*Remote Functions* use the above-mentioned principle of *physical separation* 
while maintaining *logical co-location*. The state/messaging tier (i.e., the 
Flink processes) and the function tier are deployed, managed, and scaled 
independently.
+*Remote Functions* use the above-mentioned principle of *physical separation* 
while maintaining *logical co-location*. The state/messaging tier (i.e., the 
Flink processes), and the function tier are deployed, managed, and scaled 
independently.
 
 Function invocations happen through an HTTP / gRPC protocol and go through a 
service that routes invocation requests to any available endpoint, for example 
a Kubernetes (load-balancing) service, the AWS request gateway for Lambda, etc. 
Because invocations are self-contained (contain message, state, access to 
timers, etc.) the target functions can be treated like any stateless 
application.
 
diff --git a/docs/content/docs/concepts/logical.md 
b/docs/content/docs/concepts/logical.md
index 20f08f4..babf3f9 100644
--- a/docs/cont

[flink-statefun] branch master updated (7b51c3e -> 898bb9b)

2021-04-25 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


from 7b51c3e  [hotfix] Add missing configuration to documentation
 add 393e2bd  [hotfix][docs] Fix typos in apache-kafka.md and aws-kinesis.md
 new 898bb9b  [hotfix][docs] Fix typos in concepts

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content/docs/concepts/application-building-blocks.md | 6 +++---
 docs/content/docs/concepts/distributed_architecture.md| 4 ++--
 docs/content/docs/concepts/logical.md | 6 +++---
 docs/content/docs/io-module/apache-kafka.md   | 2 +-
 docs/content/docs/io-module/aws-kinesis.md| 2 +-
 5 files changed, 10 insertions(+), 10 deletions(-)


svn commit: r47415 - /release/flink/KEYS

2021-04-25 Thread dianfu
Author: dianfu
Date: Mon Apr 26 03:42:04 2021
New Revision: 47415

Log:
Update Arvid Heise key

Modified:
release/flink/KEYS

Modified: release/flink/KEYS
==
--- release/flink/KEYS (original)
+++ release/flink/KEYS Mon Apr 26 03:42:04 2021
@@ -2278,6 +2278,7 @@ uid   [ unknown] Arvid Heise 
 
 -BEGIN PGP PUBLIC KEY BLOCK-
+
 mQINBF9h4EcBEADBTkuM/2qlMWrfFWUkudkOK4MkqT5C5y7c+Jrl7V2Rofoc0Sg/
 nfkOxOuPCMzew3i/24EgGa0ATMKrxrUSxuDReO9oEsrHI8omJeZIxQ0tToBD8dKx
 CinQBNB0pGManHAOcvoOcwvrb+FF/j0BAYlJ2ABARsk69aB7axht9h2Udh8UUSIv




[flink] branch master updated (b2e65a4 -> 075b0b2)

2021-04-25 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b2e65a4  [FLINK-21923][table-planner-blink] Fix ClassCastException in 
SplitAggregateRule when a query contains both sum/count and avg function
 add 362f4ae  [FLINK-19449][doc] Fix wrong document for lead and lag
 add 9b9d4f0  [FLINK-19449][table] Introduce LinkedListSerializer
 add bf8f998  [FLINK-19449][table] Pass isBounded to AggFunctionFactory
 add 075b0b2  [FLINK-19449][table-planner] LEAD/LAG cannot work correctly 
in streaming mode

No new revisions were added by this update.

Summary of changes:
 docs/data/sql_functions.yml|   6 +-
 .../functions/aggfunctions/LagAggFunction.java | 163 +
 .../stream/StreamExecGlobalWindowAggregate.java|   4 +-
 .../stream/StreamExecLocalWindowAggregate.java |   2 +-
 .../exec/stream/StreamExecWindowAggregate.java |   2 +-
 .../plan/metadata/FlinkRelMdColumnInterval.scala   |  25 ++--
 .../StreamPhysicalGlobalWindowAggregate.scala  |   2 +-
 .../StreamPhysicalLocalWindowAggregate.scala   |   2 +-
 .../stream/StreamPhysicalWindowAggregate.scala |   2 +-
 .../planner/plan/utils/AggFunctionFactory.scala|  35 -
 .../table/planner/plan/utils/AggregateUtil.scala   |  27 +++-
 .../functions/aggfunctions/LagAggFunctionTest.java |  62 
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   9 +-
 .../runtime/stream/sql/OverAggregateITCase.scala   |  68 +
 .../runtime/typeutils/LinkedListSerializer.java|  97 +++-
 .../typeutils/LinkedListSerializerTest.java|  44 +++---
 16 files changed, 449 insertions(+), 101 deletions(-)
 create mode 100644 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LagAggFunction.java
 create mode 100644 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/LagAggFunctionTest.java
 copy 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 => 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
 (59%)
 copy 
flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
 => 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerTest.java
 (52%)


[flink] branch release-1.13 updated: [FLINK-21923][table-planner-blink] Fix ClassCastException in SplitAggregateRule when a query contains both sum/count and avg function

2021-04-25 Thread godfrey
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
 new a3363b9  [FLINK-21923][table-planner-blink] Fix ClassCastException in 
SplitAggregateRule when a query contains both sum/count and avg function
a3363b9 is described below

commit a3363b91b144edfbae5ab114984ded622d3f8fbc
Author: Tartarus0zm 
AuthorDate: Tue Apr 6 16:41:56 2021 +0800

[FLINK-21923][table-planner-blink] Fix ClassCastException in 
SplitAggregateRule when a query contains both sum/count and avg function

This closes #15341

(cherry picked from commit b2e65a41914766ab4b1f3495f7196611561fea4c)
---
 .../plan/rules/logical/SplitAggregateRule.scala| 32 ++
 .../plan/rules/logical/SplitAggregateRuleTest.xml  | 31 +
 .../rules/logical/SplitAggregateRuleTest.scala | 19 +
 .../runtime/stream/sql/SplitAggregateITCase.scala  | 23 
 4 files changed, 94 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index be94ba1..31d1f25 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
 import 
org.apache.flink.table.planner.plan.utils.AggregateUtil.doAllAggSupportSplit
-import org.apache.flink.table.planner.plan.utils.{ExpandUtil, WindowUtil}
+import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ExpandUtil, 
WindowUtil}
 
 import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
@@ -138,9 +138,11 @@ class SplitAggregateRule extends RelOptRule(
 val windowProps = fmq.getRelWindowProperties(agg.getInput)
 val isWindowAgg = 
WindowUtil.groupingContainsWindowStartEnd(agg.getGroupSet, windowProps)
 val isProctimeWindowAgg = isWindowAgg && !windowProps.isRowtime
+// TableAggregate is not supported. see also FLINK-21923.
+val isTableAgg = AggregateUtil.isTableAggregate(agg.getAggCallList)
 
 agg.partialFinalType == PartialFinalType.NONE && 
agg.containsDistinctCall() &&
-  splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg
+  splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg && 
!isTableAgg
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
@@ -280,11 +282,16 @@ class SplitAggregateRule extends RelOptRule(
 }
 
 // STEP 2.3: construct partial aggregates
-relBuilder.aggregate(
-  relBuilder.groupKey(fullGroupSet, 
ImmutableList.of[ImmutableBitSet](fullGroupSet)),
+// Create aggregate node directly to avoid ClassCastException,
+// Please see FLINK-21923 for more details.
+// TODO reuse aggregate function, see FLINK-22412
+val partialAggregate = FlinkLogicalAggregate.create(
+  relBuilder.build(),
+  fullGroupSet,
+  ImmutableList.of[ImmutableBitSet](fullGroupSet),
   newPartialAggCalls)
-relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
-  .setPartialFinalType(PartialFinalType.PARTIAL)
+partialAggregate.setPartialFinalType(PartialFinalType.PARTIAL)
+relBuilder.push(partialAggregate)
 
 // STEP 3: construct final aggregates
 val finalAggInputOffset = fullGroupSet.cardinality
@@ -306,13 +313,16 @@ class SplitAggregateRule extends RelOptRule(
 needMergeFinalAggOutput = true
   }
 }
-relBuilder.aggregate(
-  relBuilder.groupKey(
-SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet),
-SplitAggregateRule.remap(fullGroupSet, 
Seq(originalAggregate.getGroupSet))),
+// Create aggregate node directly to avoid ClassCastException,
+// Please see FLINK-21923 for more details.
+// TODO reuse aggregate function, see FLINK-22412
+val finalAggregate = FlinkLogicalAggregate.create(
+  relBuilder.build(),
+  SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet),
+  SplitAggregateRule.remap(fullGroupSet, 
Seq(originalAggregate.getGroupSet)),
   finalAggCalls)
-val finalAggregate = relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
 finalAggregate.setPartialFinalType(PartialFinalType.FINAL)
+relBuilder.push(finalAggregate)
 
 // STEP 4: conve

[flink] branch master updated: [FLINK-21923][table-planner-blink] Fix ClassCastException in SplitAggregateRule when a query contains both sum/count and avg function

2021-04-25 Thread godfrey
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new b2e65a4  [FLINK-21923][table-planner-blink] Fix ClassCastException in 
SplitAggregateRule when a query contains both sum/count and avg function
b2e65a4 is described below

commit b2e65a41914766ab4b1f3495f7196611561fea4c
Author: Tartarus0zm 
AuthorDate: Tue Apr 6 16:41:56 2021 +0800

[FLINK-21923][table-planner-blink] Fix ClassCastException in 
SplitAggregateRule when a query contains both sum/count and avg function

This closes #15341
---
 .../plan/rules/logical/SplitAggregateRule.scala| 32 ++
 .../plan/rules/logical/SplitAggregateRuleTest.xml  | 31 +
 .../rules/logical/SplitAggregateRuleTest.scala | 19 +
 .../runtime/stream/sql/SplitAggregateITCase.scala  | 23 
 4 files changed, 94 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index be94ba1..31d1f25 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
 import 
org.apache.flink.table.planner.plan.utils.AggregateUtil.doAllAggSupportSplit
-import org.apache.flink.table.planner.plan.utils.{ExpandUtil, WindowUtil}
+import org.apache.flink.table.planner.plan.utils.{AggregateUtil, ExpandUtil, 
WindowUtil}
 
 import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
@@ -138,9 +138,11 @@ class SplitAggregateRule extends RelOptRule(
 val windowProps = fmq.getRelWindowProperties(agg.getInput)
 val isWindowAgg = 
WindowUtil.groupingContainsWindowStartEnd(agg.getGroupSet, windowProps)
 val isProctimeWindowAgg = isWindowAgg && !windowProps.isRowtime
+// TableAggregate is not supported. see also FLINK-21923.
+val isTableAgg = AggregateUtil.isTableAggregate(agg.getAggCallList)
 
 agg.partialFinalType == PartialFinalType.NONE && 
agg.containsDistinctCall() &&
-  splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg
+  splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg && 
!isTableAgg
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
@@ -280,11 +282,16 @@ class SplitAggregateRule extends RelOptRule(
 }
 
 // STEP 2.3: construct partial aggregates
-relBuilder.aggregate(
-  relBuilder.groupKey(fullGroupSet, 
ImmutableList.of[ImmutableBitSet](fullGroupSet)),
+// Create aggregate node directly to avoid ClassCastException,
+// Please see FLINK-21923 for more details.
+// TODO reuse aggregate function, see FLINK-22412
+val partialAggregate = FlinkLogicalAggregate.create(
+  relBuilder.build(),
+  fullGroupSet,
+  ImmutableList.of[ImmutableBitSet](fullGroupSet),
   newPartialAggCalls)
-relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
-  .setPartialFinalType(PartialFinalType.PARTIAL)
+partialAggregate.setPartialFinalType(PartialFinalType.PARTIAL)
+relBuilder.push(partialAggregate)
 
 // STEP 3: construct final aggregates
 val finalAggInputOffset = fullGroupSet.cardinality
@@ -306,13 +313,16 @@ class SplitAggregateRule extends RelOptRule(
 needMergeFinalAggOutput = true
   }
 }
-relBuilder.aggregate(
-  relBuilder.groupKey(
-SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet),
-SplitAggregateRule.remap(fullGroupSet, 
Seq(originalAggregate.getGroupSet))),
+// Create aggregate node directly to avoid ClassCastException,
+// Please see FLINK-21923 for more details.
+// TODO reuse aggregate function, see FLINK-22412
+val finalAggregate = FlinkLogicalAggregate.create(
+  relBuilder.build(),
+  SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet),
+  SplitAggregateRule.remap(fullGroupSet, 
Seq(originalAggregate.getGroupSet)),
   finalAggCalls)
-val finalAggregate = relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
 finalAggregate.setPartialFinalType(PartialFinalType.FINAL)
+relBuilder.push(finalAggregate)
 
 // STEP 4: convert final aggregation output to the original aggregation 
output.
 // For example, aggr

[flink] branch master updated (14ae6fe -> 033cdea)

2021-04-25 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 14ae6fe  [hotfix][tests] Remove timout rule from KafkaTableTestBase
 add 033cdea  [FLINK-21247][table] Fix problem in MapDataSerializer#copy 
when there exists custom MapData

No new revisions were added by this update.

Summary of changes:
 .../planner/factories/TestValuesTableFactory.java  | 98 +-
 .../planner/runtime/stream/sql/CalcITCase.scala| 48 ++-
 .../table/runtime/typeutils/MapDataSerializer.java |  7 +-
 .../runtime/typeutils/MapDataSerializerTest.java   | 51 +++
 4 files changed, 197 insertions(+), 7 deletions(-)


[flink] branch release-1.13 updated: [hotfix][tests] Remove timout rule from KafkaTableTestBase

2021-04-25 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
 new f027863  [hotfix][tests] Remove timout rule from KafkaTableTestBase
f027863 is described below

commit f0278639491d034ef6818516c9c5233d1e69a393
Author: Stephan Ewen 
AuthorDate: Sun Apr 25 20:22:45 2021 +0200

[hotfix][tests] Remove timout rule from KafkaTableTestBase

That way, we get thread dumps from the CI infrastructure when the test 
hangs.
---
 .../flink/streaming/connectors/kafka/table/KafkaTableTestBase.java| 4 
 1 file changed, 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 97229be..c57e3c7 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -29,8 +29,6 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.rules.Timeout;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.utility.DockerImageName;
@@ -54,8 +52,6 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
 .withNetwork(NETWORK)
 .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
 
-@Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
-
 protected StreamExecutionEnvironment env;
 protected StreamTableEnvironment tEnv;
 


[flink] branch master updated: [hotfix][tests] Remove timout rule from KafkaTableTestBase

2021-04-25 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 14ae6fe  [hotfix][tests] Remove timout rule from KafkaTableTestBase
14ae6fe is described below

commit 14ae6fee5c835871ea0145b94fbc1e8585fe78b3
Author: Stephan Ewen 
AuthorDate: Sun Apr 25 20:22:45 2021 +0200

[hotfix][tests] Remove timout rule from KafkaTableTestBase

That way, we get thread dumps from the CI infrastructure when the test 
hangs.
---
 .../flink/streaming/connectors/kafka/table/KafkaTableTestBase.java| 4 
 1 file changed, 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 97229be..c57e3c7 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -29,8 +29,6 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.rules.Timeout;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.utility.DockerImageName;
@@ -54,8 +52,6 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
 .withNetwork(NETWORK)
 .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
 
-@Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
-
 protected StreamExecutionEnvironment env;
 protected StreamTableEnvironment tEnv;
 


[flink] branch master updated: [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work with Adaptive Scheduler.

2021-04-25 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e676442  [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase 
work with Adaptive Scheduler.
e676442 is described below

commit e676442b9faa1ec0b668e8394dd2353ac2de01c6
Author: Stephan Ewen 
AuthorDate: Fri Apr 23 17:17:01 2021 +0200

[FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work with 
Adaptive Scheduler.

The test previously relied on an implicit contract that instances of 
OperatorCoordinators are never recreated
on the same JobManager. That implicit contract is no longer true with the 
Adaptive Scheduler.

This change adjusts the test to no longer make that assumption.

This closes #15739
---
 .../CoordinatorEventsExactlyOnceITCase.java| 112 -
 1 file changed, 86 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index edc9ede..2337115 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -64,11 +64,13 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -169,6 +171,9 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 
 @Test
 public void test() throws Exception {
+// this captures variables communicated across instances, recoveries, 
etc.
+TestScript.reset();
+
 final int numEvents1 = 200;
 final int numEvents2 = 5;
 final int delay1 = 1;
@@ -296,19 +301,23 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 
 private final int delay;
 private final int maxNumber;
+private final int failAtMessage;
 private int nextNumber;
 
 private CompletableFuture requestedCheckpoint;
 private CompletableFuture nextToComplete;
 
-private final int failAtMessage;
-private boolean failedBefore;
-
-private final ArrayDeque recoveredTaskRunning = new 
ArrayDeque<>();
-
 private SubtaskGateway subtaskGateway;
 private boolean workLoopRunning;
 
+/**
+ * This contains all variables that are necessary to track the 
progress of the test, and
+ * which need to be tracked across instances of this coordinator (some 
scheduler
+ * implementations may re-instantiate the ExecutionGraph and the 
coordinators around global
+ * failures).
+ */
+private final TestScript testScript;
+
 private EventSendingCoordinator(Context context, String name, int 
numEvents, int delay) {
 checkArgument(delay > 0);
 checkArgument(numEvents >= 3);
@@ -316,6 +325,9 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 this.context = context;
 this.maxNumber = numEvents;
 this.delay = delay;
+
+this.testScript = TestScript.getForOperator(name);
+
 this.mailboxExecutor =
 Executors.newSingleThreadExecutor(
 new DispatcherThreadFactory(
@@ -349,17 +361,12 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 String.format("Don't recognize event '%s' from task 
%d.", event, subtask));
 }
 
-// We complete all events that were enqueued. We may need to 
complete
-// multiple ones here, because it can happen that after a failure 
no real recovery
-// happens that results in an event being sent (and this method 
being called), but that
-// immediately another failure comes, triggered by the other 
operator coordinator (or
-// its task).
-synchronized (recoveredTaskRunning) {
-for (CountDownLatch latch : recoveredTaskRunning) {
-latch.countDown();
-}
-recoveredTaskRunning.clear();
-}
+// this unblocks all the delayed actions that where kicked off 
while the previous
+// task was stil

[flink] branch release-1.13 updated: [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work with Adaptive Scheduler.

2021-04-25 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
 new 0dc6326  [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase 
work with Adaptive Scheduler.
0dc6326 is described below

commit 0dc632681defaa1d66d3b2e884f311121467d894
Author: Stephan Ewen 
AuthorDate: Fri Apr 23 17:17:01 2021 +0200

[FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work with 
Adaptive Scheduler.

The test previously relied on an implicit contract that instances of 
OperatorCoordinators are never recreated
on the same JobManager. That implicit contract is no longer true with the 
Adaptive Scheduler.

This change adjusts the test to no longer make that assumption.

This closes #15739
---
 .../CoordinatorEventsExactlyOnceITCase.java| 112 -
 1 file changed, 86 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index edc9ede..2337115 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -64,11 +64,13 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -169,6 +171,9 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 
 @Test
 public void test() throws Exception {
+// this captures variables communicated across instances, recoveries, 
etc.
+TestScript.reset();
+
 final int numEvents1 = 200;
 final int numEvents2 = 5;
 final int delay1 = 1;
@@ -296,19 +301,23 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 
 private final int delay;
 private final int maxNumber;
+private final int failAtMessage;
 private int nextNumber;
 
 private CompletableFuture requestedCheckpoint;
 private CompletableFuture nextToComplete;
 
-private final int failAtMessage;
-private boolean failedBefore;
-
-private final ArrayDeque recoveredTaskRunning = new 
ArrayDeque<>();
-
 private SubtaskGateway subtaskGateway;
 private boolean workLoopRunning;
 
+/**
+ * This contains all variables that are necessary to track the 
progress of the test, and
+ * which need to be tracked across instances of this coordinator (some 
scheduler
+ * implementations may re-instantiate the ExecutionGraph and the 
coordinators around global
+ * failures).
+ */
+private final TestScript testScript;
+
 private EventSendingCoordinator(Context context, String name, int 
numEvents, int delay) {
 checkArgument(delay > 0);
 checkArgument(numEvents >= 3);
@@ -316,6 +325,9 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 this.context = context;
 this.maxNumber = numEvents;
 this.delay = delay;
+
+this.testScript = TestScript.getForOperator(name);
+
 this.mailboxExecutor =
 Executors.newSingleThreadExecutor(
 new DispatcherThreadFactory(
@@ -349,17 +361,12 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 String.format("Don't recognize event '%s' from task 
%d.", event, subtask));
 }
 
-// We complete all events that were enqueued. We may need to 
complete
-// multiple ones here, because it can happen that after a failure 
no real recovery
-// happens that results in an event being sent (and this method 
being called), but that
-// immediately another failure comes, triggered by the other 
operator coordinator (or
-// its task).
-synchronized (recoveredTaskRunning) {
-for (CountDownLatch latch : recoveredTaskRunning) {
-latch.countDown();
-}
-recoveredTaskRunning.clear();
-}
+// this unblocks all the delayed actions that where kicked off 
while the previous
+// t

[flink] branch release-1.13 updated: [FLINK-22356][hive][filesystem] Fix partition-time commit failure when watermark is applied defined TIMESTAMP_LTZ column

2021-04-25 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
 new 740ad3d  [FLINK-22356][hive][filesystem] Fix partition-time commit 
failure when watermark is applied defined TIMESTAMP_LTZ column
740ad3d is described below

commit 740ad3df3ff6a33cb33223a71e174f989b5b04aa
Author: Leonard Xu 
AuthorDate: Sat Apr 24 00:26:10 2021 +0800

[FLINK-22356][hive][filesystem] Fix partition-time commit failure when 
watermark is applied defined TIMESTAMP_LTZ column

This closes #15709
---
 .../content.zh/docs/connectors/table/filesystem.md |  50 -
 .../docs/connectors/table/hive/hive_read_write.md  |  37 ++-
 docs/content/docs/connectors/table/filesystem.md   |  48 +++-
 .../docs/connectors/table/hive/hive_read_write.md  |  37 ++-
 .../flink/connectors/hive/HiveTableSinkITCase.java | 121 +
 .../flink/table/filesystem/FileSystemOptions.java  |  13 +++
 .../table/filesystem/FileSystemTableFactory.java   |  17 +++
 .../stream/PartitionTimeCommitTrigger.java |  23 +++-
 .../filesystem/FileSystemTableFactoryTest.java |  27 +
 9 files changed, 339 insertions(+), 34 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/filesystem.md 
b/docs/content.zh/docs/connectors/table/filesystem.md
index cfee0bd..b3dfeb2 100644
--- a/docs/content.zh/docs/connectors/table/filesystem.md
+++ b/docs/content.zh/docs/connectors/table/filesystem.md
@@ -225,6 +225,12 @@ To define when to commit a partition, providing partition 
commit trigger:
 Duration
 The partition will not commit until the delay time. If it is a 
daily partition, should be '1 d', if it is a hourly partition, should be '1 
h'.
 
+
+sink.partition-commit.watermark-time-zone
+UTC
+String
+The time zone to parse the long watermark value to TIMESTAMP 
value, the parsed watermark timestamp is used to compare with partition time to 
decide the partition should commit or not. This option is only take effect when 
`sink.partition-commit.trigger` is set to 'partition-time'. If this option is 
not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ 
column, but this config is not configured, then users may see the partition 
committed after a few hours. Th [...]
+
   
 
 
@@ -401,7 +407,7 @@ The parallelism of writing files into external file system 
(including Hive) can
 
 ## Full Example
 
-The below shows how the file system connector can be used to write a streaming 
query to write data from Kafka into a file system and runs a batch query to 
read that data back out.
+The below examples show how the file system connector can be used to write a 
streaming query to write data from Kafka into a file system and runs a batch 
query to read that data back out.
 
 ```sql
 
@@ -409,7 +415,7 @@ CREATE TABLE kafka_table (
   user_id STRING,
   order_amount DOUBLE,
   log_ts TIMESTAMP(3),
-  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
+  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP column
 ) WITH (...);
 
 CREATE TABLE fs_table (
@@ -438,4 +444,44 @@ FROM kafka_table;
 SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
 ```
 
+If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` 
to commit, the `sink.partition-commit.watermark-time-zone` is required to set 
to the session time zone, otherwise the partition committed may happen after a 
few hours.  
+```sql
+
+CREATE TABLE kafka_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  ts BIGINT, -- time in epoch milliseconds
+  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
+  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP_LTZ column
+) WITH (...);
+
+CREATE TABLE fs_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  dt STRING,
+  `hour` STRING
+) PARTITIONED BY (dt, `hour`) WITH (
+  'connector'='filesystem',
+  'path'='...',
+  'format'='parquet',
+  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
+  'sink.partition-commit.delay'='1 h',
+  'sink.partition-commit.trigger'='partition-time',
+  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user 
configured time zone is 'Asia/Shanghai'
+  'sink.partition-commit.policy.kind'='success-file'
+);
+
+-- streaming sql, insert into file system table
+INSERT INTO fs_table 
+SELECT 
+user_id, 
+order_amount, 
+DATE_FORMAT(ts_ltz, '-MM-dd'),
+DATE_FORMAT(ts_ltz, 'HH') 
+FROM kafka_table;
+
+-- batch sql, select with partition pruning
+SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
+```
+
 {{< top >}}
diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md 
b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index d0efdeb..6d45f8d 100

[flink] branch master updated: [FLINK-22356][hive][filesystem] Fix partition-time commit failure when watermark is applied defined TIMESTAMP_LTZ column

2021-04-25 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new bddcbc4  [FLINK-22356][hive][filesystem] Fix partition-time commit 
failure when watermark is applied defined TIMESTAMP_LTZ column
bddcbc4 is described below

commit bddcbc4f93a8d473eafc0f9a1f976d43aa3af4ca
Author: Leonard Xu 
AuthorDate: Sat Apr 24 00:26:10 2021 +0800

[FLINK-22356][hive][filesystem] Fix partition-time commit failure when 
watermark is applied defined TIMESTAMP_LTZ column

This closes #15709
---
 .../content.zh/docs/connectors/table/filesystem.md |  50 -
 .../docs/connectors/table/hive/hive_read_write.md  |  37 ++-
 docs/content/docs/connectors/table/filesystem.md   |  48 +++-
 .../docs/connectors/table/hive/hive_read_write.md  |  37 ++-
 .../flink/connectors/hive/HiveTableSinkITCase.java | 121 +
 .../flink/table/filesystem/FileSystemOptions.java  |  13 +++
 .../table/filesystem/FileSystemTableFactory.java   |  17 +++
 .../stream/PartitionTimeCommitTrigger.java |  23 +++-
 .../filesystem/FileSystemTableFactoryTest.java |  27 +
 9 files changed, 339 insertions(+), 34 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/filesystem.md 
b/docs/content.zh/docs/connectors/table/filesystem.md
index cfee0bd..b3dfeb2 100644
--- a/docs/content.zh/docs/connectors/table/filesystem.md
+++ b/docs/content.zh/docs/connectors/table/filesystem.md
@@ -225,6 +225,12 @@ To define when to commit a partition, providing partition 
commit trigger:
 Duration
 The partition will not commit until the delay time. If it is a 
daily partition, should be '1 d', if it is a hourly partition, should be '1 
h'.
 
+
+sink.partition-commit.watermark-time-zone
+UTC
+String
+The time zone to parse the long watermark value to TIMESTAMP 
value, the parsed watermark timestamp is used to compare with partition time to 
decide the partition should commit or not. This option is only take effect when 
`sink.partition-commit.trigger` is set to 'partition-time'. If this option is 
not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ 
column, but this config is not configured, then users may see the partition 
committed after a few hours. Th [...]
+
   
 
 
@@ -401,7 +407,7 @@ The parallelism of writing files into external file system 
(including Hive) can
 
 ## Full Example
 
-The below shows how the file system connector can be used to write a streaming 
query to write data from Kafka into a file system and runs a batch query to 
read that data back out.
+The below examples show how the file system connector can be used to write a 
streaming query to write data from Kafka into a file system and runs a batch 
query to read that data back out.
 
 ```sql
 
@@ -409,7 +415,7 @@ CREATE TABLE kafka_table (
   user_id STRING,
   order_amount DOUBLE,
   log_ts TIMESTAMP(3),
-  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
+  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP column
 ) WITH (...);
 
 CREATE TABLE fs_table (
@@ -438,4 +444,44 @@ FROM kafka_table;
 SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
 ```
 
+If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` 
to commit, the `sink.partition-commit.watermark-time-zone` is required to set 
to the session time zone, otherwise the partition committed may happen after a 
few hours.  
+```sql
+
+CREATE TABLE kafka_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  ts BIGINT, -- time in epoch milliseconds
+  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
+  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on 
TIMESTAMP_LTZ column
+) WITH (...);
+
+CREATE TABLE fs_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  dt STRING,
+  `hour` STRING
+) PARTITIONED BY (dt, `hour`) WITH (
+  'connector'='filesystem',
+  'path'='...',
+  'format'='parquet',
+  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
+  'sink.partition-commit.delay'='1 h',
+  'sink.partition-commit.trigger'='partition-time',
+  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user 
configured time zone is 'Asia/Shanghai'
+  'sink.partition-commit.policy.kind'='success-file'
+);
+
+-- streaming sql, insert into file system table
+INSERT INTO fs_table 
+SELECT 
+user_id, 
+order_amount, 
+DATE_FORMAT(ts_ltz, '-MM-dd'),
+DATE_FORMAT(ts_ltz, 'HH') 
+FROM kafka_table;
+
+-- batch sql, select with partition pruning
+SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
+```
+
 {{< top >}}
diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md 
b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index d0efdeb..6d45f8d 100644
--- a/do

[flink] branch master updated (0f8adc0 -> 6071f96)

2021-04-25 Thread lirui
This is an automated email from the ASF dual-hosted git repository.

lirui pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 0f8adc0  [FLINK-20720][python][docs] Add documentation for 
ProcessFunction in Python DataStream API
 add 6071f96  [FLINK-22294][hive] Hive reading fail when getting file 
numbers on different filesystem nameservices

No new revisions were added by this update.

Summary of changes:
 .../connectors/hive/HiveSourceFileEnumerator.java   |  5 +
 .../connectors/hive/read/HiveTableInputFormat.java  | 21 -
 2 files changed, 1 insertion(+), 25 deletions(-)