[flink] branch master updated (d81ac48 -> 886b01d)

2019-07-02 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d81ac48  [hotfix][docs] remove duplicate `to` in state doc
 new d1b22b5  [FLINK-13049][table-planner-blink] Rename windowProperties 
and PlannerResolvedFieldReference to avoid name conflit
 new 886b01d  [FLINK-13049][table-planner-blink] Port planner expressions 
to blink-planner from flink-planner

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../expressions/PlannerTypeInferenceUtilImpl.java  | 142 
 .../table/functions/sql/FlinkSqlOperatorTable.java |   1 +
 .../table/api/ExpressionParserException.scala} |   0
 .../flink/table/calcite/FlinkRelBuilder.scala  |   4 +-
 .../flink/table/calcite/FlinkTypeFactory.scala |  21 +-
 .../codegen/agg/AggsHandlerCodeGenerator.scala |  14 +-
 .../agg/batch/HashWindowCodeGenerator.scala|   4 +-
 .../agg/batch/SortWindowCodeGenerator.scala|   4 +-
 .../codegen/agg/batch/WindowCodeGenerator.scala|   4 +-
 ...nce.scala => ExestingFieldFieldReference.scala} |   2 +-
 .../flink/table/expressions/ExpressionBridge.scala |   0
 .../flink/table/expressions/InputTypeSpec.scala|   0
 .../table/expressions/PlannerExpression.scala  |   0
 .../expressions/PlannerExpressionConverter.scala   | 836 +
 .../expressions/PlannerExpressionParserImpl.scala  | 726 ++
 .../table/expressions/PlannerExpressionUtils.scala |   0
 .../flink/table/expressions/aggregations.scala | 439 +++
 .../flink/table/expressions/arithmetic.scala   | 165 
 .../org/apache/flink/table/expressions/call.scala  | 326 
 .../org/apache/flink/table/expressions/cast.scala  |  59 ++
 .../flink/table/expressions/collection.scala   | 235 ++
 .../flink/table/expressions/comparison.scala   | 242 ++
 .../apache/flink/table/expressions/composite.scala |   0
 .../flink/table/expressions/fieldExpression.scala  | 253 +++
 .../flink/table/expressions/hashExpressions.scala  | 124 +++
 .../apache/flink/table/expressions/literals.scala  | 139 
 .../org/apache/flink/table/expressions/logic.scala | 109 +++
 .../flink/table/expressions/mathExpressions.scala  | 532 +
 .../apache/flink/table/expressions/ordering.scala  |   0
 .../flink/table/expressions/overOffsets.scala  |   0
 .../apache/flink/table/expressions/package.scala   |   0
 ...perties.scala => plannerWindowProperties.scala} |  24 +-
 .../table/expressions/stringExpressions.scala  | 585 ++
 .../apache/flink/table/expressions/subquery.scala  |  95 +++
 .../apache/flink/table/expressions/symbols.scala   |   0
 .../org/apache/flink/table/expressions/time.scala  | 369 +
 .../flink/table/expressions/windowProperties.scala |  73 +-
 .../functions/utils/UserDefinedFunctionUtils.scala |   8 +
 .../org/apache/flink/table/plan/TreeNode.scala | 115 +++
 .../flink/table/plan/logical/groupWindows.scala|   8 +-
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |   4 +-
 .../plan/metadata/FlinkRelMdUniqueGroups.scala |   4 +-
 .../table/plan/metadata/FlinkRelMdUniqueKeys.scala |   4 +-
 .../nodes/calcite/LogicalWindowAggregate.scala |   8 +-
 .../table/plan/nodes/calcite/WindowAggregate.scala |   6 +-
 .../logical/FlinkLogicalWindowAggregate.scala  |   4 +-
 .../batch/BatchExecHashWindowAggregate.scala   |   4 +-
 .../batch/BatchExecHashWindowAggregateBase.scala   |   4 +-
 .../batch/BatchExecLocalHashWindowAggregate.scala  |   4 +-
 .../batch/BatchExecLocalSortWindowAggregate.scala  |   4 +-
 .../batch/BatchExecSortWindowAggregate.scala   |   4 +-
 .../batch/BatchExecSortWindowAggregateBase.scala   |   4 +-
 .../batch/BatchExecWindowAggregateBase.scala   |   6 +-
 .../stream/StreamExecGroupWindowAggregate.scala|   6 +-
 .../logical/LogicalWindowAggregateRuleBase.scala   |   8 +-
 .../plan/rules/logical/WindowPropertiesRule.scala  |  20 +-
 .../flink/table/plan/util/AggregateUtil.scala  |  20 +-
 .../flink/table/plan/util/FlinkRelMdUtil.scala |   4 +-
 .../flink/table/plan/util/RelExplainUtil.scala |   4 +-
 .../flink/table/plan/util/RexNodeExtractor.scala   |   9 +-
 .../flink/table/sources/TableSourceUtil.scala  |  25 +-
 .../table/sources/tsextractors/ExistingField.scala |   4 +-
 .../flink/table/typeutils/TypeInfoCheckUtils.scala | 277 +++
 .../flink/table/validate/ValidationResult.scala|   0
 .../flink/table/expressions/KeywordParseTest.scala |  62 ++
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  16 +-
 .../table/plan/util/RexNodeExtractorTest.scala | 195 ++---
 67 files changed, 6114 insertions(+), 254 deletions(-)
 create mode 100644 
flink-table/flink-t

[flink] 01/02: [FLINK-13049][table-planner-blink] Rename windowProperties and PlannerResolvedFieldReference to avoid name conflit

2019-07-02 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d1b22b56a92eef26998555dc35371447593afc8c
Author: JingsongLi 
AuthorDate: Tue Jul 2 09:56:38 2019 +0800

[FLINK-13049][table-planner-blink] Rename windowProperties and 
PlannerResolvedFieldReference to avoid name conflit
---
 .../flink/table/calcite/FlinkRelBuilder.scala  |  4 ++--
 .../codegen/agg/AggsHandlerCodeGenerator.scala | 14 ++--
 .../agg/batch/HashWindowCodeGenerator.scala|  4 ++--
 .../agg/batch/SortWindowCodeGenerator.scala|  4 ++--
 .../codegen/agg/batch/WindowCodeGenerator.scala|  4 ++--
 ...nce.scala => ExestingFieldFieldReference.scala} |  2 +-
 ...perties.scala => plannerWindowProperties.scala} | 24 -
 .../flink/table/plan/logical/groupWindows.scala|  8 +++
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |  4 ++--
 .../plan/metadata/FlinkRelMdUniqueGroups.scala |  4 ++--
 .../table/plan/metadata/FlinkRelMdUniqueKeys.scala |  4 ++--
 .../nodes/calcite/LogicalWindowAggregate.scala |  8 +++
 .../table/plan/nodes/calcite/WindowAggregate.scala |  6 +++---
 .../logical/FlinkLogicalWindowAggregate.scala  |  4 ++--
 .../batch/BatchExecHashWindowAggregate.scala   |  4 ++--
 .../batch/BatchExecHashWindowAggregateBase.scala   |  4 ++--
 .../batch/BatchExecLocalHashWindowAggregate.scala  |  4 ++--
 .../batch/BatchExecLocalSortWindowAggregate.scala  |  4 ++--
 .../batch/BatchExecSortWindowAggregate.scala   |  4 ++--
 .../batch/BatchExecSortWindowAggregateBase.scala   |  4 ++--
 .../batch/BatchExecWindowAggregateBase.scala   |  6 +++---
 .../stream/StreamExecGroupWindowAggregate.scala|  6 +++---
 .../logical/LogicalWindowAggregateRuleBase.scala   |  8 +++
 .../plan/rules/logical/WindowPropertiesRule.scala  | 20 ++---
 .../flink/table/plan/util/AggregateUtil.scala  | 20 -
 .../flink/table/plan/util/FlinkRelMdUtil.scala |  4 ++--
 .../flink/table/plan/util/RelExplainUtil.scala |  4 ++--
 .../flink/table/sources/TableSourceUtil.scala  | 25 +++---
 .../table/sources/tsextractors/ExistingField.scala |  4 ++--
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 16 +++---
 30 files changed, 120 insertions(+), 111 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index 4683424..8b5cf8a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.calcite
 
 import org.apache.flink.table.calcite.FlinkRelFactories.{ExpandFactory, 
RankFactory, SinkFactory}
-import org.apache.flink.table.expressions.WindowProperty
+import org.apache.flink.table.expressions.PlannerWindowProperty
 import org.apache.flink.table.operations.QueryOperation
 import org.apache.flink.table.plan.QueryOperationConverter
 import org.apache.flink.table.runtime.rank.{RankRange, RankType}
@@ -111,7 +111,7 @@ object FlinkRelBuilder {
 *
 * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]].
 */
-  case class NamedWindowProperty(name: String, property: WindowProperty)
+  case class PlannerNamedWindowProperty(name: String, property: 
PlannerWindowProperty)
 
   def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() {
 def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder =
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
index f77810b..bf40b04 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -58,7 +58,7 @@ class AggsHandlerCodeGenerator(
 
   /** window properties like window_start and window_end, only used in window 
aggregates */
   private var namespaceClassName: String = _
-  private var windowProperties: Seq[WindowProperty] = Seq()
+  private var windowProperties: Seq[PlannerWindowProperty] = Seq()
   private var hasNamespace: Boolean = false
 
   /** Aggregates informations */
@@ -182,7 +182,7 @@ class AggsHandlerCodeGenerator(
 * Adds window properties such as window_start, window_end
 */
   private def initialWindowProperties(
-  windowProperties: Seq[WindowProperty],
+  windowProperties: Seq[PlannerWin

[flink] branch master updated: [hotfix][docs] remove duplicate `to` in state doc

2019-07-02 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d81ac48  [hotfix][docs] remove duplicate `to` in state doc
d81ac48 is described below

commit d81ac4899a1072ef38103572c6f7e459c94fa895
Author: sunjincheng121 
AuthorDate: Wed Jul 3 13:55:23 2019 +0800

[hotfix][docs] remove duplicate `to` in state doc
---
 docs/ops/state/large_state_tuning.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/ops/state/large_state_tuning.md 
b/docs/ops/state/large_state_tuning.md
index a98bb4a..e90880b 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -108,7 +108,7 @@ impact.
 
 For state to be snapshotted asynchronsously, you need to use a state backend 
which supports asynchronous snapshotting.
 Starting from Flink 1.3, both RocksDB-based as well as heap-based state 
backends (`filesystem`) support asynchronous
-snapshotting and use it by default. This applies to to both managed operator 
state as well as managed keyed state (incl. timers state).
+snapshotting and use it by default. This applies to both managed operator 
state as well as managed keyed state (incl. timers state).
 
 Note *The combination RocksDB state 
backend with heap-based timers currently does NOT support asynchronous 
snapshots for the timers state.
 Other state like keyed state is still snapshotted asynchronously. Please note 
that this is not a regression from previous versions and will be resolved with 
`FLINK-10026`.*



[flink-web] branch asf-site updated: Rebuild website

2019-07-02 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 947ce88  Rebuild website
947ce88 is described below

commit 947ce88b7811825774d6681b535f483fabc14cfd
Author: sunjincheng121 
AuthorDate: Wed Jul 3 10:17:39 2019 +0800

Rebuild website
---
 .jekyll-metadata   | Bin 84463 -> 92656 bytes
 content/zh/flink-architecture.html |  49 ++---
 2 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/.jekyll-metadata b/.jekyll-metadata
index bba9081..74b3891 100644
Binary files a/.jekyll-metadata and b/.jekyll-metadata differ
diff --git a/content/zh/flink-architecture.html 
b/content/zh/flink-architecture.html
index c39218e..1739ba3 100644
--- a/content/zh/flink-architecture.html
+++ b/content/zh/flink-architecture.html
@@ -182,9 +182,9 @@
 
 
 
-Apache Flink is a framework and distributed processing engine for stateful 
computations over unbounded and bounded data streams. Flink has been 
designed to run in all common cluster environments, perform 
computations at in-memory speed and at any scale.
+Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 
能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
 
-Here, we explain important aspects of Flink’s architecture.
+接下来,我们来介绍一下 Flink 架构中的重要方面。
 
 
 
-Process Unbounded and Bounded 
Data
+处理无界和有界数据
 
-Any kind of data is produced as a stream of events. Credit card 
transactions, sensor measurements, machine logs, or user interactions on a 
website or mobile application, all of these data are generated as a stream.
+任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。
 
-Data can be processed as unbounded or bounded streams.
+数据可以被作为 无界 或者 有界 流来处理。
 
 
   
-Unbounded streams have a start but no defined end. 
They do not terminate and provide data as it is generated. Unbounded streams 
must be continuously processed, i.e., events must be promptly handled after 
they have been ingested. It is not possible to wait for all input data to 
arrive because the input is unbounded and will not be complete at any point in 
time. Processing unbounded data often requires that events are ingested in a 
specific order, such as the order  [...]
+无界流 
有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
   
   
-Bounded streams have a defined start and end. Bounded 
streams can be processed by ingesting all data before performing any 
computations. Ordered ingestion is not required to process bounded streams 
because a bounded data set can always be sorted. Processing of bounded streams 
is also known as batch processing.
+有界流 
有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理
   
 
 
@@ -211,26 +211,26 @@
   
 
 
-Apache Flink excels at processing unbounded and bounded data 
sets. Precise control of time and state enable Flink’s runtime to run 
any kind of application on unbounded streams. Bounded streams are internally 
processed by algorithms and data structures that are specifically designed for 
fixed sized data sets, yielding excellent performance.
+Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 
的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。
 
-Convince yourself by exploring the use cases 
that have been built on top of Flink.
+通过探索 Flink 之上构建的 用例 来加深理解。
 
-Deploy Applications Anywhere
+部署应用到任意地方
 
-Apache Flink is a distributed system and requires compute resources in 
order to execute applications. Flink integrates with all common cluster 
resource managers such as https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html";>Hadoop
 YARN, https://mesos.apache.org";>Apache Mesos, and https://kubernetes.io/";>Kubernetes but can also be setup to run as a 
stand-alone cluster.
+Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html";>Hadoop
 YARN、 https://mesos.apache.org";>Apache Mesos 和 https://kubernetes.io/";>Kubernetes,但同时也可以作为独立集群运行。
 
-Flink is designed to work well each of the previously listed resource 
managers. This is achieved by resource-manager-specific deployment modes that 
allow Flink to interact with each resource manager in its idiomatic way.
+Flink 
被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 
可以采用与当前资源管理器相适应的方式进行交互。
 
-When deploying a Flink application, Flink automatically identifies the 
required resources based on the application’s configured parallelism and 
requests them from the resource manager. In case of a failure, Flink replaces 
the failed container by requesting new resources. All communication to submit 
or control an application happens via REST calls. This eases the integration of 
Flink in many environmen

[flink-web] branch asf-site updated: [FLINK-11561][docs-zh] Translate "Flink Architecture" page into Chinese

2019-07-02 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 5a2e8a1  [FLINK-11561][docs-zh] Translate "Flink Architecture" page 
into Chinese
5a2e8a1 is described below

commit 5a2e8a1dae1c0567dcdef94ed32fe94b87441845
Author: tom_gong 
AuthorDate: Fri May 10 19:02:13 2019 +0800

[FLINK-11561][docs-zh] Translate "Flink Architecture" page into Chinese

This close #214
---
 flink-architecture.zh.md | 49 
 1 file changed, 24 insertions(+), 25 deletions(-)

diff --git a/flink-architecture.zh.md b/flink-architecture.zh.md
index 580ab30..e3935e0 100644
--- a/flink-architecture.zh.md
+++ b/flink-architecture.zh.md
@@ -16,9 +16,9 @@ title: "Apache Flink 是什么?"
 
 
 
-Apache Flink is a framework and distributed processing engine for stateful 
computations over *unbounded and bounded* data streams. Flink has been designed 
to run in *all common cluster environments*, perform computations at *in-memory 
speed* and at *any scale*.
+Apache Flink 是一个框架和分布式处理引擎,用于在*无边界和有边界*数据流上进行有状态的计算。Flink 
能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
 
-Here, we explain important aspects of Flink's architecture.
+接下来,我们来介绍一下 Flink 架构中的重要方面。
 
 
 
-## Process Unbounded and Bounded Data
+## 处理无界和有界数据
 
-Any kind of data is produced as a stream of events. Credit card transactions, 
sensor measurements, machine logs, or user interactions on a website or mobile 
application, all of these data are generated as a stream. 
+任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。
 
-Data can be processed as *unbounded* or *bounded* streams. 
+数据可以被作为 *无界* 或者 *有界* 流来处理。
 
-1. **Unbounded streams** have a start but no defined end. They do not 
terminate and provide data as it is generated. Unbounded streams must be 
continuously processed, i.e., events must be promptly handled after they have 
been ingested. It is not possible to wait for all input data to arrive because 
the input is unbounded and will not be complete at any point in time. 
Processing unbounded data often requires that events are ingested in a specific 
order, such as the order in which events o [...]
+1. **无界流** 
有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
 
-2. **Bounded streams** have a defined start and end. Bounded streams can be 
processed by ingesting all data before performing any computations. Ordered 
ingestion is not required to process bounded streams because a bounded data set 
can always be sorted. Processing of bounded streams is also known as batch 
processing.
+2. **有界流** 
有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理
 
 
   
 
 
-**Apache Flink excels at processing unbounded and bounded data sets.** Precise 
control of time and state enable Flink's runtime to run any kind of application 
on unbounded streams. Bounded streams are internally processed by algorithms 
and data structures that are specifically designed for fixed sized data sets, 
yielding excellent performance. 
+**Apache Flink 擅长处理无界和有界数据集** 精确的时间控制和状态化使得 Flink 
的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。
 
-Convince yourself by exploring the [use cases]({{ site.baseurl 
}}/usecases.html) that have been built on top of Flink.
+通过探索 Flink 之上构建的 [用例]({{ site.baseurl }}/zh/usecases.html) 来加深理解。
 
-## Deploy Applications Anywhere
+## 部署应用到任意地方
 
-Apache Flink is a distributed system and requires compute resources in order 
to execute applications. Flink integrates with all common cluster resource 
managers such as [Hadoop 
YARN](https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html),
 [Apache Mesos](https://mesos.apache.org), and 
[Kubernetes](https://kubernetes.io/) but can also be setup to run as a 
stand-alone cluster.
+Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 [Hadoop 
YARN](https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html)、
 [Apache Mesos](https://mesos.apache.org) 和 
[Kubernetes](https://kubernetes.io/),但同时也可以作为独立集群运行。
 
-Flink is designed to work well each of the previously listed resource 
managers. This is achieved by resource-manager-specific deployment modes that 
allow Flink to interact with each resource manager in its idiomatic way. 
+Flink 
被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 
可以采用与当前资源管理器相适应的方式进行交互。
 
-When deploying a Flink application, Flink automatically identifies the 
required resources based on the application's configured parallelism and 
requests them from the resource manager. In case of a failure, Flink replaces 
the failed container by requesting new resources. All communication to submit 
or control an application happens via REST calls. This eases the integration

[flink] 03/05: [FLINK-12946][docs-zh] Translate "Apache NiFi Connector" page into Chinese

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5caed717570e14e0d45c49eccf409c3170034af1
Author: aloys 
AuthorDate: Sun Jun 23 00:54:09 2019 +0800

[FLINK-12946][docs-zh] Translate "Apache NiFi Connector" page into Chinese

This closes #8838
---
 docs/dev/connectors/nifi.zh.md | 43 +-
 1 file changed, 17 insertions(+), 26 deletions(-)

diff --git a/docs/dev/connectors/nifi.zh.md b/docs/dev/connectors/nifi.zh.md
index 97fd831..114092f 100644
--- a/docs/dev/connectors/nifi.zh.md
+++ b/docs/dev/connectors/nifi.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "Apache NiFi Connector"
+title: "Apache NiFi 连接器"
 nav-title: NiFi
 nav-parent_id: connectors
 nav-pos: 7
@@ -23,9 +23,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This connector provides a Source and Sink that can read from and write to
-[Apache NiFi](https://nifi.apache.org/). To use this connector, add the
-following dependency to your project:
+[Apache NiFi](https://nifi.apache.org/) 连接器提供了可以读取和写入的 Source 和 Sink。
+使用这个连接器,需要在工程中添加下面的依赖:
 
 {% highlight xml %}
 
@@ -35,30 +34,23 @@ following dependency to your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/projectsetup/dependencies.html)
-for information about how to package the program with the libraries for
-cluster execution.
+注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 
[这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。
 
- Installing Apache NiFi
+ 安装 Apache NiFi
 
-Instructions for setting up a Apache NiFi cluster can be found
-[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi).
+安装 Apache NiFi 集群请参考 
[这里](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi)。
 
  Apache NiFi Source
 
-The connector provides a Source for reading data from Apache NiFi to Apache 
Flink.
+该连接器提供了一个 Source 可以用来从 Apache NiFi 读取数据到 Apache Flink。
 
-The class `NiFiSource(…)` provides 2 constructors for reading data from NiFi.
+`NiFiSource(…)` 类有两个构造方法。
 
-- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(…)` given 
the client's SiteToSiteConfig and a
- default wait time of 1000 ms.
+- `NiFiSource(SiteToSiteConfig config)` - 构造一个 `NiFiSource(…)` ,需要指定参数 
SiteToSiteConfig ,采用默认的等待时间 1000 ms。
 
-- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a 
`NiFiSource(…)` given the client's
- SiteToSiteConfig and the specified wait time (in milliseconds).
+- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - 构造一个 
`NiFiSource(…)`,需要指定参数 SiteToSiteConfig 和等待时间(单位为毫秒)。
 
-Example:
+示例:
 
 
 
@@ -89,18 +81,17 @@ val nifiSource = new NiFiSource(clientConfig)
 
 
 
-Here data is read from the Apache NiFi Output Port called "Data for Flink" 
which is part of Apache NiFi
-Site-to-site protocol configuration.
+数据从 Apache NiFi Output Port 读取,Apache NiFi Output Port 也被称为 "Data for Flink",是 
Apache NiFi Site-to-site 协议配置的一部分。
 
  Apache NiFi Sink
 
-The connector provides a Sink for writing data from Apache Flink to Apache 
NiFi.
+该连接器提供了一个 Sink 可以用来把 Apache Flink 的数据写入到 Apache NiFi。
 
-The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`.
+`NiFiSink(…)` 类只有一个构造方法。
 
-- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder)` constructs a 
`NiFiSink(…)` given the client's `SiteToSiteConfig` and a 
`NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be 
ingested by NiFi.
+- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder)` 构造一个 
`NiFiSink(…)`,需要指定 `SiteToSiteConfig` 和  `NiFiDataPacketBuilder` 参数 
,`NiFiDataPacketBuilder` 可以将Flink数据转化成可以被NiFi识别的 `NiFiDataPacket`.
 
-Example:
+示例:
 
 
 
@@ -135,6 +126,6 @@ streamExecEnv.addSink(nifiSink)
 
   
 
-More information about [Apache NiFi](https://nifi.apache.org) Site-to-Site 
Protocol can be found 
[here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)
+更多关于 [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol 的信息请参考 
[这里](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)。
 
 {% top %}



[flink] 04/05: [FLINK-12943][docs-zh] Translate "HDFS Connector" page into Chinese

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 345ac8868b705b7c9be9bb70e6fb54d1d15baa9b
Author: aloys 
AuthorDate: Wed Jun 26 01:22:22 2019 +0800

[FLINK-12943][docs-zh] Translate "HDFS Connector" page into Chinese

This closes #8897
---
 docs/dev/connectors/filesystem_sink.zh.md | 80 ---
 1 file changed, 31 insertions(+), 49 deletions(-)

diff --git a/docs/dev/connectors/filesystem_sink.zh.md 
b/docs/dev/connectors/filesystem_sink.zh.md
index f9a828d..54b0c64 100644
--- a/docs/dev/connectors/filesystem_sink.zh.md
+++ b/docs/dev/connectors/filesystem_sink.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "HDFS Connector"
+title: "HDFS 连接器"
 nav-title: Rolling File Sink
 nav-parent_id: connectors
 nav-pos: 5
@@ -23,9 +23,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This connector provides a Sink that writes partitioned files to any filesystem 
supported by
-[Hadoop FileSystem](http://hadoop.apache.org). To use this connector, add the
-following dependency to your project:
+这个连接器可以向所有 [Hadoop FileSystem](http://hadoop.apache.org) 支持的文件系统写入分区文件。
+使用前,需要在工程里添加下面的依赖:
 
 {% highlight xml %}
 
@@ -35,16 +34,11 @@ following dependency to your project:
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary
-distribution. See
-[here]({{site.baseurl}}/dev/projectsetup/dependencies.html)
-for information about how to package the program with the libraries for
-cluster execution.
+注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 
[这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。
 
- Bucketing File Sink
+ 分桶文件 Sink
 
-The bucketing behaviour as well as the writing can be configured but we will 
get to that later.
-This is how you can create a bucketing sink which by default, sinks to rolling 
files that are split by time:
+关于分桶的配置我们后面会有讲述,这里先创建一个分桶 sink,默认情况下这个 sink 会将数据写入到按照时间切分的滚动文件中:
 
 
 
@@ -65,40 +59,30 @@ input.addSink(new BucketingSink[String]("/base/path"))
 
 
 
-The only required parameter is the base path where the buckets will be
-stored. The sink can be further configured by specifying a custom bucketer, 
writer and batch size.
-
-By default the bucketing sink will split by the current system time when 
elements arrive and will
-use the datetime pattern `"-MM-dd--HH"` to name the buckets. This pattern 
is passed to
-`DateTimeFormatter` with the current system time and JVM's default timezone to 
form a bucket path.
-Users can also specify a timezone for the bucketer to format bucket path. A 
new bucket will be created
-whenever a new date is encountered. For example, if you have a pattern that 
contains minutes as the
-finest granularity you will get a new bucket every minute. Each bucket is 
itself a directory that
-contains several part files: each parallel instance of the sink will create 
its own part file and
-when part files get too big the sink will also create a new part file next to 
the others. When a
-bucket becomes inactive, the open part file will be flushed and closed. A 
bucket is regarded as
-inactive when it hasn't been written to recently. By default, the sink checks 
for inactive buckets
-every minute, and closes any buckets which haven't been written to for over a 
minute. This
-behaviour can be configured with `setInactiveBucketCheckInterval()` and
-`setInactiveBucketThreshold()` on a `BucketingSink`.
-
-You can also specify a custom bucketer by using `setBucketer()` on a 
`BucketingSink`. If desired,
-the bucketer can use a property of the element or tuple to determine the 
bucket directory.
-
-The default writer is `StringWriter`. This will call `toString()` on the 
incoming elements
-and write them to part files, separated by newline. To specify a custom writer 
use `setWriter()`
-on a `BucketingSink`. If you want to write Hadoop SequenceFiles you can use 
the provided
-`SequenceFileWriter` which can also be configured to use compression.
-
-There are two configuration options that specify when a part file should be 
closed
-and a new one started:
+初始化时只需要一个参数,这个参数表示分桶文件存储的路径。分桶 sink 可以通过指定自定义的 bucketer、 writer 和 batch 值进一步配置。
+
+默认情况下,当数据到来时,分桶 sink 会按照系统时间对数据进行切分,并以 `"-MM-dd--HH"` 的时间格式给每个桶命名。然后 
+`DateTimeFormatter` 按照这个时间格式将当前系统时间以 JVM 默认时区转换成分桶的路径。用户可以自定义时区来生成
+分桶的路径。每遇到一个新的日期都会产生一个新的桶。例如,如果时间的格式以分钟为粒度,那么每分钟都会产生一个桶。每个桶都是一个目录,
+目录下包含了几个部分文件(part files):每个 sink 的并发实例都会创建一个属于自己的部分文件,当这些文件太大的时候,sink 
会产生新的部分文件。
+当一个桶不再活跃时,打开的部分文件会刷盘并且关闭。如果一个桶最近一段时间都没有写入,那么这个桶被认为是不活跃的。sink 默认会每分钟
+检查不活跃的桶、关闭那些超过一分钟没有写入的桶。这些行为可以通过 `BucketingSink` 的 
`setInactiveBucketCheckInterval()` 
+和 `setInactiveBucketThreshold()` 进行设置。
+
+可以调用`BucketingSink` 的 `setBucketer()` 方法指定自定义的 
bucketer,如果需要的话,也可以使用一个元素或者元组属性来决定桶的路径。
+
+默认的 writer 是 `StringWriter`。数据到达时,通过 `toString()` 
方法得到内容,内容以换行符分隔,`StringWriter` 将数据
+内容写入部分文件。可以通过 `BucketingSink` 的 `setWriter()`

[flink] 01/05: [FLINK-12945][docs-zh] Translate "RabbitMQ Connector" page into Chinese

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6bf207166a5ca09c259d854989ccff18687745bc
Author: aloys 
AuthorDate: Sun Jun 23 20:19:26 2019 +0800

[FLINK-12945][docs-zh] Translate "RabbitMQ Connector" page into Chinese

This closes #8843
---
 docs/dev/connectors/rabbitmq.md| 10 +++---
 docs/dev/connectors/rabbitmq.zh.md | 73 ++
 2 files changed, 32 insertions(+), 51 deletions(-)

diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md
index 838db2a..a3a56a8 100644
--- a/docs/dev/connectors/rabbitmq.md
+++ b/docs/dev/connectors/rabbitmq.md
@@ -23,7 +23,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# License of the RabbitMQ Connector
+## License of the RabbitMQ Connector
 
 Flink's RabbitMQ connector defines a Maven dependency on the
 "RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public 
License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the 
Apache License version 2 ("ASL").
@@ -35,7 +35,7 @@ Users that create and publish derivative work based on Flink's
 RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
 must be aware that this may be subject to conditions declared in the Mozilla 
Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") 
and the Apache License version 2 ("ASL").
 
-# RabbitMQ Connector
+## RabbitMQ Connector
 
 This connector provides access to data streams from 
[RabbitMQ](http://www.rabbitmq.com/). To use this connector, add the following 
dependency to your project:
 
@@ -49,10 +49,10 @@ This connector provides access to data streams from 
[RabbitMQ](http://www.rabbit
 
 Note that the streaming connectors are currently not part of the binary 
distribution. See linking with them for cluster execution 
[here]({{site.baseurl}}/dev/projectsetup/dependencies.html).
 
- Installing RabbitMQ
+### Installing RabbitMQ
 Follow the instructions from the [RabbitMQ download 
page](http://www.rabbitmq.com/download.html). After the installation the server 
automatically starts, and the application connecting to RabbitMQ can be 
launched.
 
- RabbitMQ Source
+### RabbitMQ Source
 
 This connector provides a `RMQSource` class to consume messages from a RabbitMQ
 queue. This source provides three different levels of guarantees, depending
@@ -131,7 +131,7 @@ val stream = env
 
 
 
- RabbitMQ Sink
+### RabbitMQ Sink
 This connector provides a `RMQSink` class for sending messages to a RabbitMQ
 queue. Below is a code example for setting up a RabbitMQ sink.
 
diff --git a/docs/dev/connectors/rabbitmq.zh.md 
b/docs/dev/connectors/rabbitmq.zh.md
index 838db2a..e213d3f 100644
--- a/docs/dev/connectors/rabbitmq.zh.md
+++ b/docs/dev/connectors/rabbitmq.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "RabbitMQ Connector"
+title: "RabbitMQ 连接器"
 nav-title: RabbitMQ
 nav-parent_id: connectors
 nav-pos: 6
@@ -23,21 +23,17 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# License of the RabbitMQ Connector
+## RabbitMQ 连接器的许可证
 
-Flink's RabbitMQ connector defines a Maven dependency on the
-"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public 
License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the 
Apache License version 2 ("ASL").
+Flink 的 RabbitMQ 连接器依赖了 "RabbitMQ AMQP Java Client",它基于三种协议下发行:Mozilla Public 
License 1.1 ("MPL")、GNU General Public License version 2 ("GPL") 和 Apache 
License version 2 ("ASL")。
 
-Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
-nor packages binaries from the "RabbitMQ AMQP Java Client".
+Flink 自身既没有复用 "RabbitMQ AMQP Java Client" 的代码,也没有将 "RabbitMQ AMQP Java Client" 
打二进制包。
 
-Users that create and publish derivative work based on Flink's
-RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
-must be aware that this may be subject to conditions declared in the Mozilla 
Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") 
and the Apache License version 2 ("ASL").
+如果用户发布的内容是基于 Flink 的 RabbitMQ 连接器的(进而重新发布了 "RabbitMQ AMQP Java Client" 
),那么一定要注意这可能会受到 Mozilla Public License 1.1 ("MPL")、GNU General Public License 
version 2 ("GPL")、Apache License version 2 ("ASL") 协议的限制.
 
-# RabbitMQ Connector
+## RabbitMQ 连接器
 
-This connector provides access to data streams from 
[RabbitMQ](http://www.rabbitmq.com/). To use this connector, add the following 
dependency to your project:
+这个连接器可以访问 [RabbitMQ](http://www.rabbitmq.com/) 的数据流。使用这个连接器,需要在工程里添加下面的依赖:
 
 {% highlight xml %}
 
@@ -47,44 +43,30 @@ This connector provides access to data streams from 
[RabbitMQ](http://www.rabbit
 
 {% endhighlight %}
 
-Note that the streaming connectors are currently not part of the binary 
distribution. See linking with them fo

[flink] 05/05: [FLINK-12944][docs-zh] Translate "Streaming File Sink" page into Chinese

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 29a0e8e5e7d69b0450ac3865c25df0e5d75d758b
Author: aloys 
AuthorDate: Fri Jun 28 00:39:56 2019 +0800

[FLINK-12944][docs-zh] Translate "Streaming File Sink" page into Chinese

This closes #8918
---
 docs/dev/connectors/streamfile_sink.zh.md | 93 ---
 1 file changed, 35 insertions(+), 58 deletions(-)

diff --git a/docs/dev/connectors/streamfile_sink.zh.md 
b/docs/dev/connectors/streamfile_sink.zh.md
index 40b104b..79586ad 100644
--- a/docs/dev/connectors/streamfile_sink.zh.md
+++ b/docs/dev/connectors/streamfile_sink.zh.md
@@ -23,30 +23,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This connector provides a Sink that writes partitioned files to filesystems
-supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
+这个连接器提供了一个 Sink 来将分区文件写入到支持 [Flink `FileSystem`]({{ 
site.baseurl}}/zh/ops/filesystems/index.html) 接口的文件系统中。
 
-Since in streaming the input is potentially infinite, the streaming file sink 
writes data
-into buckets. The bucketing behaviour is configurable but a useful default is 
time-based
-bucketing where we start writing a new bucket every hour and thus get
-individual files that each contain a part of the infinite output stream.
+由于在流处理中输入可能是无限的,所以流处理的文件 sink 会将数据写入到桶中。如何分桶是可以配置的,一种有效的默认
+策略是基于时间的分桶,这种策略每个小时写入一个新的桶,这些桶各包含了无限输出流的一部分数据。
 
-Within a bucket, we further split the output into smaller part files based on a
-rolling policy. This is useful to prevent individual bucket files from getting
-too big. This is also configurable but the default policy rolls files based on
-file size and a timeout, *i.e* if no new data was written to a part file.
+在一个桶内部,会进一步将输出基于滚动策略切分成更小的文件。这有助于防止桶文件变得过大。滚动策略也是可以配置的,默认
+策略会根据文件大小和超时时间来滚动文件,超时时间是指没有新数据写入部分文件(part file)的时间。
 
-The `StreamingFileSink` supports both row-wise encoding formats and
-bulk-encoding formats, such as [Apache Parquet](http://parquet.apache.org).
+`StreamingFileSink` 支持行编码格式和批量编码格式,比如 [Apache 
Parquet](http://parquet.apache.org)。
 
- Using Row-encoded Output Formats
+ 使用行编码输出格式
 
-The only required configuration are the base path where we want to output our
-data and an
-[Encoder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/Encoder.html)
-that is used for serializing records to the `OutputStream` for each file.
+只需要配置一个输出路径和一个 [Encoder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/Encoder.html)。
+Encoder负责为每个文件的 `OutputStream` 序列化数据。
 
-Basic usage thus looks like this:
+基本用法如下:
 
 
 
@@ -84,55 +76,40 @@ input.addSink(sink)
 
 
 
-This will create a streaming sink that creates hourly buckets and uses a
-default rolling policy. The default bucket assigner is
+上面的代码创建了一个按小时分桶、按默认策略滚动的 sink。默认分桶器是
 [DateTimeBucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html)
-and the default rolling policy is
-[DefaultRollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html).
-You can specify a custom
+,默认滚动策略是
+[DefaultRollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html)。
+可以为 sink builder 自定义
 [BucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html)
-and
-[RollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
-on the sink builder. Please check out the JavaDoc for
-[StreamingFileSink]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html)
-for more configuration options and more documentation about the workings and
-interactions of bucket assigners and rolling policies.
+和
+[RollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)。
+更多配置操作以及分桶器和滚动策略的工作机制和相互影响请参考:
+[StreamingFileSink]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html)。
 
- Using Bulk-encoded Output Formats
+ 使用批量编码输出格式
 
-In the above example we used an `Encoder` that can encode or serialize each
-record individually. The streaming file sink also supports bulk-encoded output
-formats such as [Apache Parquet](http://parquet.apache.org). To use these,
-instead of `StreamingFileSink.forRowFormat()` you would use
-`StreamingFileSink.forBulkFormat()` and specify a `BulkWriter.Factory`.
+上面的示例使用 `Encoder` 分别序列化每一个记录。除此之外,流式文件 sink 还支持批量编码的输出格式,比如 [Apache 
Parquet](http://pa

[flink] 02/05: [FLINK-12938][docs-zh] Translate "Streaming Connectors" page into Chinese

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bda485fc077453ff0a555c3f25e702bfd0a1f339
Author: aloys 
AuthorDate: Thu Jun 27 16:17:40 2019 +0800

[FLINK-12938][docs-zh] Translate "Streaming Connectors" page into Chinese

This closes #8837
---
 docs/dev/connectors/index.zh.md | 49 +
 1 file changed, 20 insertions(+), 29 deletions(-)

diff --git a/docs/dev/connectors/index.zh.md b/docs/dev/connectors/index.zh.md
index b5405d4..aabcdbd 100644
--- a/docs/dev/connectors/index.zh.md
+++ b/docs/dev/connectors/index.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "Streaming Connectors"
+title: "流式连接器"
 nav-id: connectors
 nav-title: Connectors
 nav-parent_id: streaming
@@ -28,16 +28,15 @@ under the License.
 * toc
 {:toc}
 
-## Predefined Sources and Sinks
+## 预定义的 Source 和 Sink
 
-A few basic data sources and sinks are built into Flink and are always 
available.
-The [predefined data sources]({{ site.baseurl 
}}/dev/datastream_api.html#data-sources) include reading from files, 
directories, and sockets, and
-ingesting data from collections and iterators.
-The [predefined data sinks]({{ site.baseurl 
}}/dev/datastream_api.html#data-sinks) support writing to files, to stdout and 
stderr, and to sockets.
+一些比较基本的 Source 和 Sink 已经内置在 Flink 里。
+[预定义 data sources]({{ site.baseurl }}/zh/dev/datastream_api.html#data-sources) 
支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。
+[预定义 data sinks]({{ site.baseurl }}/zh/dev/datastream_api.html#data-sinks) 
支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。
 
-## Bundled Connectors
+## 附带的连接器
 
-Connectors provide code for interfacing with various third-party systems. 
Currently these systems are supported:
+连接器可以和多种多样的第三方系统进行交互。目前支持以下系统:
 
  * [Apache Kafka](kafka.html) (source/sink)
  * [Apache Cassandra](cassandra.html) (sink)
@@ -48,15 +47,13 @@ Connectors provide code for interfacing with various 
third-party systems. Curren
  * [Apache NiFi](nifi.html) (source/sink)
  * [Twitter Streaming API](twitter.html) (source)
 
-Keep in mind that to use one of these connectors in an application, additional 
third party
-components are usually required, e.g. servers for the data stores or message 
queues.
-Note also that while the streaming connectors listed in this section are part 
of the
-Flink project and are included in source releases, they are not included in 
the binary distributions. 
-Further instructions can be found in the corresponding subsections.
+请记住,在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列。
+要注意这些列举的连接器是 Flink 工程的一部分,包含在发布的源码中,但是不包含在二进制发行版中。
+更多说明可以参考对应的子部分。
 
-## Connectors in Apache Bahir
+## Apache Bahir 中的连接器
 
-Additional streaming connectors for Flink are being released through [Apache 
Bahir](https://bahir.apache.org/), including:
+Flink 还有些一些额外的连接器通过 [Apache Bahir](https://bahir.apache.org/) 发布, 包括:
 
  * [Apache 
ActiveMQ](https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/)
 (source/sink)
  * [Apache 
Flume](https://bahir.apache.org/docs/flink/current/flink-streaming-flume/) 
(sink)
@@ -64,23 +61,17 @@ Additional streaming connectors for Flink are being 
released through [Apache Bah
  * [Akka](https://bahir.apache.org/docs/flink/current/flink-streaming-akka/) 
(sink)
  * [Netty](https://bahir.apache.org/docs/flink/current/flink-streaming-netty/) 
(source)
 
-## Other Ways to Connect to Flink
+## 连接Fink的其他方法
 
-### Data Enrichment via Async I/O
+### 异步 I/O
 
-Using a connector isn't the only way to get data in and out of Flink.
-One common pattern is to query an external database or web service in a `Map` 
or `FlatMap`
-in order to enrich the primary datastream.
-Flink offers an API for [Asynchronous I/O]({{ site.baseurl 
}}/dev/stream/operators/asyncio.html)
-to make it easier to do this kind of enrichment efficiently and robustly.
+使用connector并不是唯一可以使数据进入或者流出Flink的方式。
+一种常见的模式是从外部数据库或者 Web 服务查询数据得到初始数据流,然后通过 `Map` 或者 `FlatMap` 对初始数据流进行丰富和增强。
+Flink 提供了[异步 I/O]({{ site.baseurl }}/zh/dev/stream/operators/asyncio.html) API 
来让这个过程更加简单、高效和稳定。
 
-### Queryable State
+### 可查询状态
 
-When a Flink application pushes a lot of data to an external data store, this
-can become an I/O bottleneck.
-If the data involved has many fewer reads than writes, a better approach can be
-for an external application to pull from Flink the data it needs.
-The [Queryable State]({{ site.baseurl 
}}/dev/stream/state/queryable_state.html) interface
-enables this by allowing the state being managed by Flink to be queried on 
demand.
+当 Flink 应用程序需要向外部存储推送大量数据时会导致 I/O 瓶颈问题出现。在这种场景下,如果对数据的读操作远少于写操作,那么让外部应用从 Flink 
拉取所需的数据会是一种更好的方式。
+[可查询状态]({{ site.baseurl }}/zh/dev/stream/state/queryable_state.html) 
接口可以实现这个功能,该接口允许被 Flink 托管的状态可以被按需查询。
 
 {% top %}



[flink] branch master updated (686bc84 -> 29a0e8e)

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 686bc84  [hotfix][python] Remove the redundant 'python setup.py 
install' from tox.ini
 new 6bf2071  [FLINK-12945][docs-zh] Translate "RabbitMQ Connector" page 
into Chinese
 new bda485f  [FLINK-12938][docs-zh] Translate "Streaming Connectors" page 
into Chinese
 new 5caed71  [FLINK-12946][docs-zh] Translate "Apache NiFi Connector" page 
into Chinese
 new 345ac88  [FLINK-12943][docs-zh] Translate "HDFS Connector" page into 
Chinese
 new 29a0e8e  [FLINK-12944][docs-zh] Translate "Streaming File Sink" page 
into Chinese

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/connectors/filesystem_sink.zh.md | 80 +++---
 docs/dev/connectors/index.zh.md   | 49 +++-
 docs/dev/connectors/nifi.zh.md| 43 ++
 docs/dev/connectors/rabbitmq.md   | 10 ++--
 docs/dev/connectors/rabbitmq.zh.md| 73 +---
 docs/dev/connectors/streamfile_sink.zh.md | 93 ---
 6 files changed, 135 insertions(+), 213 deletions(-)



[flink-web] branch asf-site updated: Rebuild website

2019-07-02 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new b28e44e  Rebuild website
b28e44e is described below

commit b28e44efb01ee70d24899f406884d9f507c844e4
Author: sunjincheng121 
AuthorDate: Wed Jul 3 09:30:26 2019 +0800

Rebuild website
---
 .jekyll-metadata | Bin 0 -> 84463 bytes
 content/blog/feed.xml| 147 +++
 content/blog/index.html  |  38 ++-
 content/blog/page2/index.html|  38 +--
 content/blog/page3/index.html|  40 +--
 content/blog/page4/index.html|  40 +--
 content/blog/page5/index.html|  40 +--
 content/blog/page6/index.html|  38 ++-
 content/blog/page7/index.html|  36 ++-
 content/blog/page8/index.html|  38 +--
 content/blog/page9/index.html|  25 ++
 content/contributing/contribute-code.html|   2 +-
 content/contributing/improve-website.html|   2 +-
 content/downloads.html   |  28 +--
 content/index.html   |   8 +-
 content/news/2019/07/02/release-1.8.1.html   | 355 +++
 content/zh/contributing/improve-website.html |   2 +-
 content/zh/downloads.html|  32 +--
 content/zh/index.html|   8 +-
 19 files changed, 764 insertions(+), 153 deletions(-)

diff --git a/.jekyll-metadata b/.jekyll-metadata
new file mode 100644
index 000..bba9081
Binary files /dev/null and b/.jekyll-metadata differ
diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index cfadfc2..1c51569 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -7,6 +7,153 @@
 https://flink.apache.org/blog/feed.xml"; rel="self" 
type="application/rss+xml" />
 
 
+Apache Flink 1.8.1 Released
+

The Apache Flink community released the first bugfix version of the Apache Flink 1.8 series.

+ +

This release includes more than 40 fixes and minor improvements for Flink 1.8.1. The list below includes a detailed list of all improvements, sub-tasks and bug fixes.

+ +

We highly recommend all users to upgrade to Flink 1.8.1.

+ +

Updated Maven dependencies:

+ +
<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-java</artifactId>
+  <version>1.8.1</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java_2.11</artifactId>
+  <version>1.8.1</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients_2.11</artifactId>
+  <version>1.8.1</version>
+</dependency>
+ +

You can find the binaries on the updated Downloads page.

+ +

List of resolved issues:

+ +

Sub-task +

+
    +
  • [FLINK-10921;] - Prioritize shard consumers in Kinesis Consumer by event time +
  • +
  • [FLINK-12617;] - StandaloneJobClusterEntrypoint should default to random JobID for non-HA setups +
  • +
+ +

Bug +

+
    +
  • [FLINK-9445;] - scala-shell uses plain java command +
  • +
  • [FLINK-10455;] - Potential Kafka producer leak in case of failures +
  • +
  • [FLINK-10941;] - Slots prematurely released which still contain unconsumed data +
  • +
  • [FLINK-11059;] - JobMaster may continue using an invalid slot if releasing idle slot meet a timeout +
  • +
  • [FLINK-11107;] - Avoid me

[flink-web] branch asf-site updated: Add Apache Flink release 1.8.1

2019-07-02 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 70bcf43  Add Apache Flink release 1.8.1
70bcf43 is described below

commit 70bcf439500ba2565a027f2c137101268bf62027
Author: sunjincheng121 
AuthorDate: Mon Jun 17 16:04:54 2019 +0800

Add Apache Flink release 1.8.1
---
 _config.yml|  54 ++---
 _posts/2019-07-02-release-1.8.1.md | 152 +
 downloads.md   |   2 +-
 downloads.zh.md|   2 +-
 4 files changed, 181 insertions(+), 29 deletions(-)

diff --git a/_config.yml b/_config.yml
index a209cbd..11eea16 100644
--- a/_config.yml
+++ b/_config.yml
@@ -9,7 +9,7 @@ url: https://flink.apache.org
 
 DOCS_BASE_URL: https://ci.apache.org/projects/flink/
 
-FLINK_VERSION_STABLE: 1.8.0
+FLINK_VERSION_STABLE: 1.8.1
 FLINK_VERSION_STABLE_SHORT: 1.8
 
 FLINK_VERSION_LATEST: 1.9-SNAPSHOT
@@ -55,23 +55,23 @@ flink_releases:
   -
   version_short: 1.8
   binary_release:
-  name: "Apache Flink 1.8.0"
+  name: "Apache Flink 1.8.1"
   scala_211:
-  id: "180-download_211"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.11.tgz";
-  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.11.tgz.asc";
-  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.11.tgz.sha512";
+  id: "181-download_211"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.11.tgz";
+  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.11.tgz.asc";
+  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.11.tgz.sha512";
   scala_212:
-  id: "180-download_212"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz";
-  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz.asc";
-  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz.sha512";
+  id: "181-download_212"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.12.tgz";
+  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.12.tgz.asc";
+  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.12.tgz.sha512";
   source_release:
-  name: "Apache Flink 1.8.0"
-  id: "180-download-source"
-  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.0/flink-1.8.0-src.tgz";
-  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-src.tgz.asc";
-  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-src.tgz.sha512";
+  name: "Apache Flink 1.8.1"
+  id: "181-download-source"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-1.8.1/flink-1.8.1-src.tgz";
+  asc_url: 
"https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-src.tgz.asc";
+  sha512_url: 
"https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-src.tgz.sha512";
   optional_components:
 -
   name: "Pre-bundled Hadoop 2.4.1"
@@ -109,26 +109,26 @@ flink_releases:
   name: "Avro SQL Format"
   category: "SQL Formats"
   scala_dependent: false
-  id: 180-sql-format-avro
-  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.0/flink-avro-1.8.0.jar
-  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.0/flink-avro-1.8.0.jar.asc
-  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.0/flink-avro-1.8.0.jar.sha1
+  id: 181-sql-format-avro
+  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.1/flink-avro-1.8.1.jar
+  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.1/flink-avro-1.8.1.jar.asc
+  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.1/flink-avro-1.8.1.jar.sha1
 -
   name: "CSV SQL Format"
   category: "SQL Formats"
   scala_dependent: false
-  id: 180-sql-format-csv
-  url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.0/flink-csv-1.8.0.jar
-  asc_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.0/flink-csv-1.8.0.jar.asc
-  sha_url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.0

[flink] branch master updated: [hotfix][python] Remove the redundant 'python setup.py install' from tox.ini

2019-07-02 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 686bc84  [hotfix][python] Remove the redundant 'python setup.py 
install' from tox.ini
686bc84 is described below

commit 686bc841398dd14f054df8bf97c6d9ef8d8d99d9
Author: Dian Fu 
AuthorDate: Fri Jun 28 20:57:04 2019 +0800

[hotfix][python] Remove the redundant 'python setup.py install' from tox.ini

This close #8947
---
 flink-python/tox.ini | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index c475adf..e0a6c29 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -30,7 +30,6 @@ deps =
 pytest
 commands =
 python --version
-python setup.py install --force
 pytest
 bash ./dev/run_pip_test.sh
 



[flink] branch master updated: [FLINK-13048][hive] support decimal in Flink's integration with Hive user defined functions

2019-07-02 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 64bb08d  [FLINK-13048][hive] support decimal in Flink's integration 
with Hive user defined functions
64bb08d is described below

commit 64bb08d776ea4d153a74bbe992b59e52c3044425
Author: bowen.li 
AuthorDate: Mon Jul 1 16:29:36 2019 -0700

[FLINK-13048][hive] support decimal in Flink's integration with Hive user 
defined functions

This PR adds support for decimal in Flink's integration with Hive user 
defined functions.

This closes #8941.
---
 .../functions/hive/conversion/HiveInspectors.java  | 17 ++-
 .../table/functions/hive/HiveGenericUDAFTest.java  | 21 +++
 .../table/functions/hive/HiveGenericUDFTest.java   | 24 +++---
 .../table/functions/hive/HiveSimpleUDFTest.java| 13 
 4 files changed, 58 insertions(+), 17 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index bbd0b55..0f9dd67 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
@@ -58,6 +59,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspect
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBinaryObjectInspector;
@@ -94,6 +96,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
+import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
@@ -165,7 +168,6 @@ public class HiveInspectors {
case TIMESTAMP:
return new 
JavaConstantTimestampObjectInspector((Timestamp) value);
case DECIMAL:
-   // TODO: Needs more testing
return new 
JavaConstantHiveDecimalObjectInspector((HiveDecimal) value);
case BINARY:
return new 
JavaConstantBinaryObjectInspector((byte[]) value);
@@ -200,9 +202,9 @@ public class HiveInspectors {
return o -> new HiveChar((String) o, 
((CharType) dataType).getLength());
} else if (inspector instanceof 
HiveVarcharObjectInspector) {
return o -> new HiveVarchar((String) o, 
((VarCharType) dataType).getLength());
+   } else if (inspector instanceof 
HiveDecimalObjectInspector) {
+   return o -> HiveDecimal.create((BigDecimal) o);
}
-
-   // TODO: handle decimal type
}
 
if (inspector instanceof ListObjectInspector) {
@@ -299,9 +301,11 @@ public class HiveInspectors {
HiveVarcharObjectInspector oi = 
(HiveVarcharObjectInspector) inspector;
 
return 
oi.getPrimitiveJavaObject(data).getValue();
-   }
+   } else if (inspector instanceof 
HiveDecimalObjectInspector) {
+   HiveDecimalObjectInspector oi = 
(HiveDecimalObjectInspector) inspector;
 
-   // TODO: handle decimal type
+   return 
oi.getPrimitiveJavaObject(data).bigDecimalValue();
+   }
}
 
if (inspector instanceof ListObjectI

[flink] branch master updated: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be consistent with standard name in Hive

2019-07-02 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 011006f  [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to 
be consistent with standard name in Hive
011006f is described below

commit 011006f81c11e0d0c8bb37c20e06daad1dcb7148
Author: bowen.li 
AuthorDate: Mon Jul 1 12:42:41 2019 -0700

[FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be consistent 
with standard name in Hive

This PR renames the SQL CLI config key for HiveCatalog from hive-site-path 
to hive-conf-dir which is consistent with standard Hive conf key name.

This closes #8939.
---
 docs/dev/table/sqlClient.md| 10 +-
 .../apache/flink/table/catalog/hive/HiveCatalog.java   | 10 +-
 .../hive/descriptors/HiveCatalogDescriptor.java|  4 ++--
 .../catalog/hive/descriptors/HiveCatalogValidator.java |  4 ++--
 .../catalog/hive/factories/HiveCatalogFactory.java | 18 +-
 .../flink-sql-client/conf/sql-client-defaults.yaml |  2 +-
 6 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 9297b2d..cd69cce 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -228,12 +228,12 @@ catalogs:
- name: catalog_1
  type: hive
  property-version: 1
- hive-site-path: file://...
+ hive-conf-dir: ...
- name: catalog_2
  type: hive
  property-version: 1
  default-database: mydb2# optional: name of default database of 
this catalog
- hive-site-path: file://... # optional: path of the hive-site.xml 
file. (Default value is created by HiveConf)
+ hive-conf-dir: ... # optional: path of Hive conf directory. 
(Default value is created by HiveConf)
  hive-version: 1.2.1# optional: version of Hive (2.3.4 by 
default)
 {% endhighlight %}
 
@@ -245,7 +245,7 @@ This configuration:
 - specifies a parallelism of 1 for queries executed in this streaming 
environment,
 - specifies an event-time characteristic, and
 - runs queries in the `table` result mode.
-- creates two `HiveCatalog` (type: hive) named with their own default 
databases and specified hive site path. Hive version of the first `HiveCatalog` 
is `2.3.4` by default and that of the second one is specified as `1.2.1`.
+- creates two `HiveCatalog` (type: hive) named with their own default 
databases and specified Hive conf directory. Hive version of the first 
`HiveCatalog` is `2.3.4` by default and that of the second one is specified as 
`1.2.1`.
 - use `catalog_1` as the current catalog of the environment upon start, and 
`mydb1` as the current database of the catalog.
 
 Depending on the use case, a configuration can be split into multiple files. 
Therefore, environment files can be created for general purposes (*defaults 
environment file* using `--defaults`) as well as on a per-session basis 
(*session environment file* using `--environment`). Every CLI session is 
initialized with the default properties followed by the session properties. For 
example, the defaults environment file could specify all table sources that 
should be available for querying in ev [...]
@@ -447,11 +447,11 @@ catalogs:
  property-version: 1
  default-database: mydb2
  hive-version: 1.2.1
- hive-site-path: 
+ hive-conf-dir: 
- name: catalog_2
  type: hive
  property-version: 1
- hive-site-path: 
+ hive-conf-dir: 
 {% endhighlight %}
 
 Currently Flink supports two types of catalog - `FlinkInMemoryCatalog` and 
`HiveCatalog`.
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 03ddceb..00e7e69 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -118,10 +118,10 @@ public class HiveCatalog extends AbstractCatalog {
 
private HiveMetastoreClientWrapper client;
 
-   public HiveCatalog(String catalogName, @Nullable String 
defaultDatabase, @Nullable URL hiveSiteUrl, String hiveVersion) {
+   public HiveCatalog(String catalogName, @Nullable String 
defaultDatabase, @Nullable URL hiveConfDir, String hiveVersion) {
this(catalogName,
defaultDatabase == null ? DEFAULT_DB : defaultDatabase,
-   createHiveConf(hiveSiteUrl),
+   createHiveConf(hiveConfDir),
hiveVersion);
}
 
@@ -136,10 +136,10 @@ public class HiveCatalog extends AbstractCatalog {

[flink] branch master updated: [FLINK-13021][table][hive] unify catalog partition implementations

2019-07-02 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 59ff00d  [FLINK-13021][table][hive] unify catalog partition 
implementations
59ff00d is described below

commit 59ff00d71d298fa61a92efa4fecd46f3cefc50f6
Author: bowen.li 
AuthorDate: Mon Jul 1 12:31:47 2019 -0700

[FLINK-13021][table][hive] unify catalog partition implementations

This PR unifies catalog partition implementations.

This closes #8926.
---
 .../flink/table/catalog/hive/HiveCatalog.java  | 54 ---
 .../table/catalog/hive/HiveCatalogConfig.java  | 17 ++
 .../table/catalog/hive/HiveCatalogPartition.java   | 61 --
 .../table/catalog/hive/HivePartitionConfig.java| 17 ++
 .../connectors/hive/HiveTableOutputFormatTest.java |  9 ++--
 .../batch/connectors/hive/HiveTableSinkTest.java   |  8 +--
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 19 ---
 flink-python/pyflink/table/catalog.py  | 26 +
 flink-python/pyflink/table/tests/test_catalog.py   |  8 +--
 ...logPartition.java => CatalogPartitionImpl.java} | 23 ++--
 .../table/catalog/GenericCatalogPartition.java | 52 --
 .../flink/table/catalog/CatalogTestBase.java   |  5 ++
 .../table/catalog/GenericInMemoryCatalogTest.java  | 26 -
 .../flink/table/catalog/CatalogPartition.java  |  7 +++
 .../apache/flink/table/catalog/CatalogTest.java| 40 +++---
 .../flink/table/catalog/CatalogTestUtil.java   | 13 +
 16 files changed, 111 insertions(+), 274 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 8659a80..03ddceb 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -29,15 +29,14 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
-import org.apache.flink.table.catalog.GenericCatalogPartition;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.config.CatalogTableConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -483,7 +482,7 @@ public class HiveCatalog extends AbstractCatalog {
if (isGeneric) {
properties = retrieveFlinkProperties(properties);
}
-   String comment = 
properties.remove(CatalogTableConfig.TABLE_COMMENT);
+   String comment = properties.remove(HiveCatalogConfig.COMMENT);
 
// Table schema
TableSchema tableSchema =
@@ -515,7 +514,7 @@ public class HiveCatalog extends AbstractCatalog {
 
Map properties = new 
HashMap<>(table.getProperties());
// Table comment
-   properties.put(CatalogTableConfig.TABLE_COMMENT, 
table.getComment());
+   properties.put(HiveCatalogConfig.COMMENT, table.getComment());
 
boolean isGeneric = 
Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC));
 
@@ -623,8 +622,10 @@ public class HiveCatalog extends AbstractCatalog {
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
checkNotNull(partition, "Partition cannot be null");
 
-   if (!(partition instanceof HiveCatalogPartition)) {
-   throw new CatalogException("Currently only supports 
HiveCatalogPartition");
+   boolean isGeneric = 
Boolean.valueOf(partition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+   if (isGeneric) {
+   throw new CatalogException("Currently only supports 
non-generic CatalogPartition");
}
 
Table hiveTable = getHiveTable(tablePath);
@@ -715,7 +716,14 @@ public class HiveCatalog extends AbstractCatalog {
 
try {
  

[flink] branch master updated: [FLINK-13047][table] Fix the Optional.orElse() usage issue in DatabaseCalciteSchema

2019-07-02 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3358cd3  [FLINK-13047][table] Fix the Optional.orElse() usage issue in 
DatabaseCalciteSchema
3358cd3 is described below

commit 3358cd3b6d6ffebb313b6c02f2a53e6dbd6ec1ed
Author: Xuefu Zhang 
AuthorDate: Mon Jul 1 15:25:44 2019 -0700

[FLINK-13047][table] Fix the Optional.orElse() usage issue in 
DatabaseCalciteSchema

This PR fixes the Optional.orElse() usage issue in DatabaseCalciteSchem.

This closes #8940.
---
 .../apache/flink/table/catalog/DatabaseCalciteSchema.java | 15 ---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index e629978..95475ef 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.plan.schema.TableSourceTable;
 import org.apache.flink.table.plan.stats.FlinkStatistic;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.types.Row;
 
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -104,9 +103,19 @@ class DatabaseCalciteSchema implements Schema {
}
 
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable 
table) {
+   TableSource tableSource;
Optional tableFactory = catalog.getTableFactory();
-   TableSource tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table))
-   
.orElse(TableFactoryUtil.findAndCreateTableSource(table));
+   if (tableFactory.isPresent()) {
+   TableFactory tf = tableFactory.get();
+   if (tf instanceof TableSourceFactory) {
+   tableSource = ((TableSourceFactory) 
tf).createTableSource(tablePath, table);
+   } else {
+   throw new TableException(String.format("Cannot 
query a sink-only table. TableFactory provided by catalog %s must implement 
TableSourceFactory",
+   catalog.getClass()));
+   }
+   } else {
+   tableSource = 
TableFactoryUtil.findAndCreateTableSource(table);
+   }
 
if (!(tableSource instanceof StreamTableSource)) {
throw new TableException("Catalog tables support only 
StreamTableSource and InputFormatTableSource");



[flink] 03/03: [hotfix][table-api-java] Moved QueryOperation utilities to o.a.f.t.operations.utils

2019-07-02 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cc8e44adfaab2d6e3ae4c1992b8fa73c4066c81
Author: Dawid Wysakowicz 
AuthorDate: Tue Jul 2 16:46:23 2019 +0200

[hotfix][table-api-java] Moved QueryOperation utilities to 
o.a.f.t.operations.utils

This closes #8860
---
 .../flink/table/api/internal/TableEnvironmentImpl.java|  2 +-
 .../org/apache/flink/table/api/internal/TableImpl.java|  6 +++---
 .../operations/{ => utils}/OperationExpressionsUtils.java |  3 ++-
 .../operations/{ => utils}/OperationTreeBuilder.java  |  5 -
 .../{ => utils}/QueryOperationDefaultVisitor.java | 15 ++-
 .../utils/factories/AggregateOperationFactory.java|  2 +-
 .../operations/utils/factories/ColumnOperationUtils.java  |  2 +-
 .../utils/factories/ProjectionOperationFactory.java   |  4 ++--
 .../apache/flink/table/plan/QueryOperationConverter.java  |  2 +-
 .../apache/flink/table/plan/QueryOperationConverter.java  |  2 +-
 .../apache/flink/table/api/internal/TableEnvImpl.scala|  1 +
 11 files changed, 31 insertions(+), 13 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 9b04f56..e94c65a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -50,9 +50,9 @@ import 
org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.OperationTreeBuilder;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
+import org.apache.flink.table.operations.utils.OperationTreeBuilder;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 79f8502..8d33e10 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -40,10 +40,10 @@ import 
org.apache.flink.table.expressions.resolver.LookupCallResolver;
 import org.apache.flink.table.functions.TemporalTableFunction;
 import org.apache.flink.table.functions.TemporalTableFunctionImpl;
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
-import org.apache.flink.table.operations.OperationExpressionsUtils;
-import 
org.apache.flink.table.operations.OperationExpressionsUtils.CategorizedExpressions;
-import org.apache.flink.table.operations.OperationTreeBuilder;
 import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
+import 
org.apache.flink.table.operations.utils.OperationExpressionsUtils.CategorizedExpressions;
+import org.apache.flink.table.operations.utils.OperationTreeBuilder;
 
 import java.util.Arrays;
 import java.util.Collections;
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
similarity index 98%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
index eb2030d..9bf5b59 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.expressions.CallExpression;
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.tabl

[flink] 02/03: [FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to table-api-java module

2019-07-02 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d167a3670614901e4ef011af92b4045c7eb1612
Author: Dawid Wysakowicz 
AuthorDate: Sat Jun 1 19:08:56 2019 +0200

[FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to 
table-api-java module
---
 .../java/internal/StreamTableEnvironmentImpl.java  |   8 +-
 .../flink/table/api/EnvironmentSettings.java   |   7 +
 .../table/api/internal/TableEnvironmentImpl.java   |  28 +-
 .../expressions/utils/ApiExpressionUtils.java  |  29 +-
 .../operations/OperationExpressionsUtils.java  |   4 +-
 .../table/operations/OperationTreeBuilder.java | 680 +++--
 .../internal/StreamTableEnvironmentImpl.scala  |  10 +-
 .../flink/table/expressions/ExpressionUtils.java   |   8 +-
 .../operations/OperationTreeBuilderFactory.java|  44 --
 .../flink/table/api/internal/TableEnvImpl.scala|  17 +-
 .../operations/OperationTreeBuilderImpl.scala  | 600 --
 .../api/stream/StreamTableEnvironmentTest.scala|   3 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |   3 +-
 .../apache/flink/table/utils/TableTestBase.scala   |   6 +-
 14 files changed, 702 insertions(+), 745 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 6b37690..05815dd 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -85,8 +85,9 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
TableConfig tableConfig,
StreamExecutionEnvironment executionEnvironment,
Planner planner,
-   Executor executor) {
-   super(catalogManager, tableConfig, executor, functionCatalog, 
planner);
+   Executor executor,
+   boolean isStreaming) {
+   super(catalogManager, tableConfig, executor, functionCatalog, 
planner, isStreaming);
this.executionEnvironment = executionEnvironment;
}
 
@@ -119,7 +120,8 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
tableConfig,
executionEnvironment,
planner,
-   executor
+   executor,
+   !settings.isBatchMode()
);
}
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 37ba179..70b7ffd 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -114,6 +114,13 @@ public class EnvironmentSettings {
return builtInDatabaseName;
}
 
+   /**
+* Tells if the {@link TableEnvironment} should work in a batch or 
streaming mode.
+*/
+   public boolean isBatchMode() {
+   return isBatchMode;
+   }
+
@Internal
public Map toPlannerProperties() {
Map properties = new 
HashMap<>(toCommonProperties());
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 727727a..9b04f56 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ExternalCatalog;
 import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -46,7 +45,6 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import or

[flink] 01/03: [hotfix][table-planner][table-api-java] Move QueryOperation factories to table-api-java

2019-07-02 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39e4ad2097506f646a1c8a78f753628ad1debb82
Author: Dawid Wysakowicz 
AuthorDate: Tue Jul 2 14:55:29 2019 +0200

[hotfix][table-planner][table-api-java] Move QueryOperation factories to 
table-api-java
---
 .../operations/utils/factories}/AggregateOperationFactory.java | 5 -
 .../table/operations/utils/factories}/AliasOperationUtils.java | 3 ++-
 .../table/operations/utils/factories}/CalculatedTableFactory.java  | 4 +++-
 .../table/operations/utils/factories}/ColumnOperationUtils.java| 2 +-
 .../table/operations/utils/factories}/JoinOperationFactory.java| 5 -
 .../operations/utils/factories}/ProjectionOperationFactory.java| 4 +++-
 .../table/operations/utils/factories}/SetOperationFactory.java | 4 +++-
 .../table/operations/utils/factories}/SortOperationFactory.java| 4 +++-
 .../apache/flink/table/operations/OperationTreeBuilderImpl.scala   | 7 ---
 9 files changed, 27 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
similarity index 98%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
index ac6ee30..8fef793 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -45,6 +45,9 @@ import 
org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionRequirement;
 import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
+import org.apache.flink.table.operations.AggregateQueryOperation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.WindowAggregateQueryOperation;
 import 
org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AliasOperationUtils.java
similarity index 97%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AliasOperationUtils.java
index c73ca47..a9a45d0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AliasOperationUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableSchema;
@@ -27,6 +27,7 @@ import 
org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.operations.QueryOperation;
 
 import java.util.List;
 import java.util.stream.Collectors;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java
similarity index 96%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java
index 2e10ee8..fe01128 100644
--- 
a/flink-table/flink-table-plann

[flink] branch master updated (d903480 -> 6cc8e44)

2019-07-02 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d903480  [FLINK-13045][table] Move Scala expression DSL to 
flink-table-api-scala
 new 39e4ad2  [hotfix][table-planner][table-api-java] Move QueryOperation 
factories to table-api-java
 new 1d167a3  [FLINK-12906][table-planner][table-api-java] Ported 
OperationTreeBuilder to table-api-java module
 new 6cc8e44  [hotfix][table-api-java] Moved QueryOperation utilities to 
o.a.f.t.operations.utils

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/internal/StreamTableEnvironmentImpl.java  |   8 +-
 .../flink/table/api/EnvironmentSettings.java   |   7 +
 .../table/api/internal/TableEnvironmentImpl.java   |  30 +-
 .../apache/flink/table/api/internal/TableImpl.java |   6 +-
 .../expressions/utils/ApiExpressionUtils.java  |  29 +-
 .../table/operations/OperationTreeBuilder.java | 110 
 .../{ => utils}/OperationExpressionsUtils.java |   7 +-
 .../operations/utils/OperationTreeBuilder.java | 697 +
 .../{ => utils}/QueryOperationDefaultVisitor.java  |  15 +-
 .../factories}/AggregateOperationFactory.java  |   7 +-
 .../utils/factories}/AliasOperationUtils.java  |   3 +-
 .../utils/factories}/CalculatedTableFactory.java   |   4 +-
 .../utils/factories}/ColumnOperationUtils.java |   4 +-
 .../utils/factories}/JoinOperationFactory.java |   5 +-
 .../factories}/ProjectionOperationFactory.java |   8 +-
 .../utils/factories}/SetOperationFactory.java  |   4 +-
 .../utils/factories}/SortOperationFactory.java |   4 +-
 .../internal/StreamTableEnvironmentImpl.scala  |  10 +-
 .../flink/table/expressions/ExpressionUtils.java   |   8 +-
 .../flink/table/plan/QueryOperationConverter.java  |   2 +-
 .../operations/OperationTreeBuilderFactory.java|  44 --
 .../flink/table/plan/QueryOperationConverter.java  |   2 +-
 .../flink/table/api/internal/TableEnvImpl.scala|  18 +-
 .../operations/OperationTreeBuilderImpl.scala  | 599 --
 .../api/stream/StreamTableEnvironmentTest.scala|   3 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |   3 +-
 .../apache/flink/table/utils/TableTestBase.scala   |   6 +-
 27 files changed, 817 insertions(+), 826 deletions(-)
 delete mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
 rename 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/{
 => utils}/OperationExpressionsUtils.java (98%)
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
 rename 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/{
 => utils}/QueryOperationDefaultVisitor.java (73%)
 rename 
flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/AggregateOperationFactory.java
 (98%)
 rename 
flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/AliasOperationUtils.java
 (97%)
 rename 
flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/CalculatedTableFactory.java
 (96%)
 rename 
flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/ColumnOperationUtils.java
 (97%)
 rename 
flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/JoinOperationFactory.java
 (95%)
 rename 
flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/ProjectionOperationFactory.java
 (95%)
 rename 
flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/SetOperationFactory.java
 (95%)
 rename 
flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/SortOperationFactory.java
 (96%)
 delete mode 100644 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java
 delete mod

[flink] branch master updated: [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d903480  [FLINK-13045][table] Move Scala expression DSL to 
flink-table-api-scala
d903480 is described below

commit d903480ef7abc95cd419e6a194db2276dd7eb4cb
Author: Timo Walther 
AuthorDate: Tue Jul 2 16:55:08 2019 +0200

[FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala

This move the Scala expression DSL to flink-table-api-scala.

Users of pure table programs should define there imports like:

import org.apache.flink.table.api._
TableEnvironment.create(...)

Users of the DataStream API should define their imports like:

import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
StreamTableEnvironment.create(...)

This commit did not split the package object 
org.apache.flink.table.api.scala._ into
two parts yet because we want to give users the chance to update their 
imports.

This closes #8945.
---
 docs/dev/table/tableApi.md |   4 +
 docs/dev/table/tableApi.zh.md  |   4 +
 flink-table/flink-table-api-scala-bridge/pom.xml   |   9 +-
 .../flink/table/api/scala/DataSetConversions.scala |   6 +-
 .../table/api/scala/DataStreamConversions.scala|   6 +-
 .../flink/table/api/scala/TableConversions.scala   |  25 +-
 .../org/apache/flink/table/api/scala/package.scala |  89 ++
 flink-table/flink-table-api-scala/pom.xml  |  47 +++
 .../apache/flink/table/api}/expressionDsl.scala| 319 ++---
 .../scala/org/apache/flink/table/api/package.scala |  48 
 .../scala/org/apache/flink/table/api/package.scala |  34 ---
 .../org/apache/flink/table/api/scala/package.scala |  93 --
 12 files changed, 307 insertions(+), 377 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index a0bd51a..bd6bd38 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -44,6 +44,9 @@ The following example shows the differences between the Scala 
and Java Table API
 The Java Table API is enabled by importing 
`org.apache.flink.table.api.java.*`. The following example shows how a Java 
Table API program is constructed and how expressions are specified as strings.
 
 {% highlight java %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.java._
+
 // environment configuration
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
@@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is 
constructed. Table
 
 {% highlight scala %}
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
 import org.apache.flink.table.api.scala._
 
 // environment configuration
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 7a75c0a..e4b8a55 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -44,6 +44,9 @@ The following example shows the differences between the Scala 
and Java Table API
 The Java Table API is enabled by importing 
`org.apache.flink.table.api.java.*`. The following example shows how a Java 
Table API program is constructed and how expressions are specified as strings.
 
 {% highlight java %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.java._
+
 // environment configuration
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
@@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is 
constructed. Table
 
 {% highlight scala %}
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
 import org.apache.flink.table.api.scala._
 
 // environment configuration
diff --git a/flink-table/flink-table-api-scala-bridge/pom.xml 
b/flink-table/flink-table-api-scala-bridge/pom.xml
index 2a7f2ec..f49f71c 100644
--- a/flink-table/flink-table-api-scala-bridge/pom.xml
+++ b/flink-table/flink-table-api-scala-bridge/pom.xml
@@ -61,19 +61,18 @@ under the License.
net.alchim31.maven
scala-maven-plugin

-   
+   

scala-compile-first
process-resources

-   add-source
compile


 

[flink] 04/05: [FLINK-12883][runtime] Add getID() to ExecutionVertex

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f5fc239ee7712e02f3f3ebfc0a991acdcf6e3cf
Author: Gary Yao 
AuthorDate: Wed Jun 19 14:45:53 2019 +0200

[FLINK-12883][runtime] Add getID() to ExecutionVertex
---
 .../org/apache/flink/runtime/executiongraph/ExecutionVertex.java  | 8 
 1 file changed, 8 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a12d198..5fc0b72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
@@ -83,6 +84,8 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable priorExecutions;
 
private final Time timeout;
@@ -142,6 +145,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable

[flink] 02/05: [hotfix][runtime] Use Set instead of IdentityHashMap where possible

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bce9a4c43fb6ff61762885cb0fbc984a7338ce19
Author: Gary Yao 
AuthorDate: Thu Jun 20 17:20:00 2019 +0200

[hotfix][runtime] Use Set instead of IdentityHashMap where possible

Replace instances where we use an IdentityHashMap as a set with
Collections.newSetFromMap() in RestartPipelinedRegionStrategy and
PipelineRegionComputeUtil.
---
 .../failover/flip1/PipelinedRegionComputeUtil.java | 11 --
 .../flip1/RestartPipelinedRegionStrategy.java  | 25 +++---
 2 files changed, 17 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index 291c894..b21b52f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -22,6 +22,7 @@ package 
org.apache.flink.runtime.executiongraph.failover.flip1;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Map;
@@ -41,8 +42,6 @@ public final class PipelinedRegionComputeUtil {
return 
uniqueRegions(buildOneRegionForAllVertices(topology));
}
 
-   // we use the map (list -> null) to imitate an IdentityHashSet 
(which does not exist)
-   // this helps to optimize the building performance as it uses 
reference equality
final Map> vertexToRegion = 
new IdentityHashMap<>();
 
// iterate all the vertices which are topologically sorted
@@ -105,11 +104,9 @@ public final class PipelinedRegionComputeUtil {
 
private static Set> uniqueRegions(final 
Map> vertexToRegion) {
// find out all the distinct regions
-   final IdentityHashMap, Object> 
distinctRegions = new IdentityHashMap<>();
-   for (Set regionVertices : 
vertexToRegion.values()) {
-   distinctRegions.put(regionVertices, null);
-   }
-   return distinctRegions.keySet();
+   final Set> distinctRegions = 
Collections.newSetFromMap(new IdentityHashMap<>());
+   distinctRegions.addAll(vertexToRegion.values());
+   return distinctRegions;
}
 
private PipelinedRegionComputeUtil() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
index 58d1675..a87764b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -50,7 +51,7 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
private final FailoverTopology topology;
 
/** All failover regions. */
-   private final IdentityHashMap regions;
+   private final Set regions;
 
/** Maps execution vertex id to failover region. */
private final Map vertexToRegionMap;
@@ -80,7 +81,7 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
ResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker) {
 
this.topology = checkNotNull(topology);
-   this.regions = new IdentityHashMap<>();
+   this.regions = Collections.newSetFromMap(new 
IdentityHashMap<>());
this.vertexToRegionMap = new HashMap<>();
this.resultPartitionAvailabilityChecker = new 
RegionFailoverResultPartitionAvailabilityChecker(
resultPartitionAvailabilityChecker);
@@ -100,7 +101,7 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
for (Set regionVertices : distinctRegions) {
LOG.debug("Creating a failover region with {} 
vertices.", regionVertices.size());
final FailoverRegion failoverRegion = new 
FailoverRegion(regionVertices);
-   regions.put(failoverRegion, null);
+   regions.add(failo

[flink] 03/05: [hotfix][runtime] Remove obsolete comment from PipelinedRegionComputeUtil#uniqueRegions()

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit efebb99d5d3840047741cde4a73511252bd76e93
Author: Gary Yao 
AuthorDate: Thu Jun 20 17:23:21 2019 +0200

[hotfix][runtime] Remove obsolete comment from 
PipelinedRegionComputeUtil#uniqueRegions()
---
 .../executiongraph/failover/flip1/PipelinedRegionComputeUtil.java| 1 -
 1 file changed, 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index b21b52f..14d28b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -103,7 +103,6 @@ public final class PipelinedRegionComputeUtil {
}
 
private static Set> uniqueRegions(final 
Map> vertexToRegion) {
-   // find out all the distinct regions
final Set> distinctRegions = 
Collections.newSetFromMap(new IdentityHashMap<>());
distinctRegions.addAll(vertexToRegion.values());
return distinctRegions;



[flink] 01/05: [FLINK-12883][runtime] Extract computation of pipelined regions.

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8b72220154d3349b5157d02aab78bf8e3b381dd
Author: Gary Yao 
AuthorDate: Wed Jun 19 10:58:17 2019 +0200

[FLINK-12883][runtime] Extract computation of pipelined regions.
---
 .../failover/flip1/PipelinedRegionComputeUtil.java | 117 +
 .../flip1/RestartPipelinedRegionStrategy.java  |  75 +
 2 files changed, 119 insertions(+), 73 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
new file mode 100644
index 000..291c894
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utility for computing pipeliend regions.
+ */
+public final class PipelinedRegionComputeUtil {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);
+
+   public static Set> computePipelinedRegions(final 
FailoverTopology topology) {
+   // currently we let a job with co-location constraints fail as 
one region
+   // putting co-located vertices in the same region with each 
other can be a future improvement
+   if (topology.containsCoLocationConstraints()) {
+   return 
uniqueRegions(buildOneRegionForAllVertices(topology));
+   }
+
+   // we use the map (list -> null) to imitate an IdentityHashSet 
(which does not exist)
+   // this helps to optimize the building performance as it uses 
reference equality
+   final Map> vertexToRegion = 
new IdentityHashMap<>();
+
+   // iterate all the vertices which are topologically sorted
+   for (FailoverVertex vertex : topology.getFailoverVertices()) {
+   Set currentRegion = new HashSet<>(1);
+   currentRegion.add(vertex);
+   vertexToRegion.put(vertex, currentRegion);
+
+   for (FailoverEdge inputEdge : vertex.getInputEdges()) {
+   if 
(inputEdge.getResultPartitionType().isPipelined()) {
+   final FailoverVertex producerVertex = 
inputEdge.getSourceVertex();
+   final Set 
producerRegion = vertexToRegion.get(producerVertex);
+
+   if (producerRegion == null) {
+   throw new 
IllegalStateException("Producer task " + producerVertex.getExecutionVertexName()
+   + " failover region is 
null while calculating failover region for the consumer task "
+   + 
vertex.getExecutionVertexName() + ". This should be a failover region building 
bug.");
+   }
+
+   // check if it is the same as the 
producer region, if so skip the merge
+   // this check can significantly reduce 
compute complexity in All-to-All PIPELINED edge case
+   if (currentRegion != producerRegion) {
+   // merge current region and 
producer region
+   // merge the smaller region 
into the larger one to reduce the cost
+   final Set 
smallerSet;
+   final Set 
largerSet;
+ 

[flink] 05/05: [FLINK-12883][runtime] Introduce PartitionReleaseStrategy

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9aa9a170ff657198c4710706bc3802de42063ca
Author: Gary Yao 
AuthorDate: Wed Jun 19 15:06:51 2019 +0200

[FLINK-12883][runtime] Introduce PartitionReleaseStrategy

- Introduce interface PartitionReleaseStrategy.
- Introduce RegionPartitionReleaseStrategy and
  NotReleasingPartitionReleaseStrategy implementations, which can be 
configured
  via a new config option.
- Add unit tests for new classes.
- Increase visibility of methods in TestingSchedulingTopology for unit tests
  outside of its package.
---
 .../flink/configuration/JobManagerOptions.java |   9 +
 .../runtime/executiongraph/ExecutionGraph.java | 132 +++---
 .../executiongraph/ExecutionGraphBuilder.java  |   6 +
 .../runtime/executiongraph/ExecutionVertex.java|   1 +
 .../failover/flip1/PipelinedRegionComputeUtil.java |  20 +-
 .../NotReleasingPartitionReleaseStrategy.java  |  56 ++
 .../partitionrelease/PartitionReleaseStrategy.java |  58 ++
 .../PartitionReleaseStrategyFactoryLoader.java |  41 +
 .../flip1/partitionrelease/PipelinedRegion.java|  69 +++
 .../PipelinedRegionConsumedBlockingPartitions.java |  51 ++
 .../PipelinedRegionExecutionView.java  |  64 +++
 .../RegionPartitionReleaseStrategy.java| 190 +++
 .../ExecutionGraphPartitionReleaseTest.java| 202 +
 .../RegionPartitionReleaseStrategyTest.java| 149 +++
 .../PartitionReleaseStrategyFactoryLoaderTest.java |  55 ++
 .../PipelinedRegionExecutionViewTest.java  |  75 
 .../strategy/TestingSchedulingTopology.java|  14 +-
 17 files changed, 1154 insertions(+), 38 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 89515fd..0f9e18d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -173,6 +173,15 @@ public class JobManagerOptions {
text("'legacy': legacy scheduler"),
text("'ng': new generation scheduler"))
.build());
+   /**
+* Config parameter controlling whether partitions should already be 
released during the job execution.
+*/
+   @Documentation.ExcludeFromDocumentation("User normally should not be 
expected to deactivate this feature. " +
+   "We aim at removing this flag eventually.")
+   public static final ConfigOption 
PARTITION_RELEASE_DURING_JOB_EXECUTION =
+   key("jobmanager.partition.release-during-job-execution")
+   .defaultValue(true)
+   .withDescription("Controls whether partitions should 
already be released during the job execution.");
 
@Documentation.ExcludeFromDocumentation("dev use only; likely 
temporary")
public static final ConfigOption 
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 514121e..ce65b68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -48,6 +48,9 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
 import 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -55,6 +58,7 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org

[flink] branch master updated (a0f747d -> c9aa9a1)

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from a0f747d  [hotfix][runtime] Remove legacy NoOpIOManager class
 new a8b7222  [FLINK-12883][runtime] Extract computation of pipelined 
regions.
 new bce9a4c  [hotfix][runtime] Use Set instead of IdentityHashMap where 
possible
 new efebb99  [hotfix][runtime] Remove obsolete comment from 
PipelinedRegionComputeUtil#uniqueRegions()
 new 2f5fc23  [FLINK-12883][runtime] Add getID() to ExecutionVertex
 new c9aa9a1  [FLINK-12883][runtime] Introduce PartitionReleaseStrategy

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/configuration/JobManagerOptions.java |   9 +
 .../runtime/executiongraph/ExecutionGraph.java | 132 +++---
 .../executiongraph/ExecutionGraphBuilder.java  |   6 +
 .../runtime/executiongraph/ExecutionVertex.java|   9 +
 .../failover/flip1/PipelinedRegionComputeUtil.java | 131 +
 .../flip1/RestartPipelinedRegionStrategy.java  | 100 ++
 .../NotReleasingPartitionReleaseStrategy.java  |  56 ++
 .../partitionrelease/PartitionReleaseStrategy.java |  58 ++
 .../PartitionReleaseStrategyFactoryLoader.java}|  39 ++--
 .../flip1/partitionrelease/PipelinedRegion.java|  69 +++
 .../PipelinedRegionConsumedBlockingPartitions.java |  51 ++
 .../PipelinedRegionExecutionView.java  |  64 +++
 .../RegionPartitionReleaseStrategy.java| 190 +++
 .../ExecutionGraphPartitionReleaseTest.java| 202 +
 .../RegionPartitionReleaseStrategyTest.java| 149 +++
 .../PartitionReleaseStrategyFactoryLoaderTest.java |  55 ++
 .../PipelinedRegionExecutionViewTest.java  |  75 
 .../strategy/TestingSchedulingTopology.java|  14 +-
 18 files changed, 1263 insertions(+), 146 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java
 copy 
flink-runtime/src/{test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
 => 
main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java}
 (51%)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java



[flink] 03/06: [hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 423e8a8afa4fd440ba2abb6c2b535f881ef84374
Author: Zhijiang 
AuthorDate: Fri Jun 28 12:18:47 2019 +0800

[hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown

IOManager#close would ignore any exceptions internally in order not to 
interrupt other close operations,
then IOManager#isProperlyShutDown is used for checking any exceptions 
during close process. We could use
IOUtils#closeAll for handling all the close operations and finally throwing 
the suppressed exceptions to
get the same effect, then isProperlyShutDown method could be removed 
completely.
---
 .../flink/runtime/io/disk/iomanager/IOManager.java |  47 -
 .../runtime/io/disk/iomanager/IOManagerAsync.java  | 105 ++---
 .../flink/runtime/io/disk/ChannelViewsTest.java|   5 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |   3 +-
 .../flink/runtime/io/disk/SpillingBufferTest.java  |   5 +-
 .../AsynchronousBufferFileWriterTest.java  |   2 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   |   2 +-
 .../BufferFileWriterFileSegmentReaderTest.java |   2 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |   2 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java  |   3 +-
 .../runtime/io/disk/iomanager/IOManagerITCase.java |   1 -
 .../runtime/io/disk/iomanager/IOManagerTest.java   |   2 +-
 .../runtime/operators/hash/HashTableITCase.java|   5 +-
 .../hash/NonReusingHashJoinIteratorITCase.java |   5 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |   5 +-
 .../hash/ReOpenableHashTableTestBase.java  |   5 +-
 .../hash/ReusingHashJoinIteratorITCase.java|   5 +-
 .../resettable/SpillingResettableIteratorTest.java |   5 +-
 ...pillingResettableMutableObjectIteratorTest.java |   5 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |   5 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |   5 +-
 .../runtime/operators/sort/ExternalSortITCase.java |   5 +-
 .../sort/ExternalSortLargeRecordsITCase.java   |   5 +-
 .../sort/FixedLengthRecordSorterTest.java  |   2 +-
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |   5 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |   5 +-
 .../testutils/BinaryOperatorTestBase.java  |   3 +-
 .../operators/testutils/DriverTestBase.java|   1 -
 .../operators/testutils/MockEnvironment.java   |   4 +-
 .../runtime/operators/testutils/TaskTestBase.java  |   2 +-
 .../operators/testutils/UnaryOperatorTestBase.java |   1 -
 .../operators/util/HashVsSortMiniBenchmark.java|   5 +-
 .../runtime/taskexecutor/TaskExecutorTest.java |   1 -
 .../streaming/runtime/io/BufferSpillerTest.java|   2 +-
 ...CheckpointBarrierAlignerAlignmentLimitTest.java |   2 +-
 .../io/SpillingCheckpointBarrierAlignerTest.java   |   2 +-
 .../streaming/runtime/tasks/OperatorChainTest.java |   2 +-
 .../runtime/tasks/StreamTaskTestHarness.java   |   1 -
 .../util/AbstractStreamOperatorTestHarness.java|   4 +-
 .../flink/table/runtime/aggregate/HashAggTest.java |   5 +-
 .../runtime/hashtable/BinaryHashTableTest.java |   5 +-
 .../io/CompressedHeaderlessChannelTest.java|   6 +-
 .../join/Int2SortMergeJoinOperatorTest.java|   5 +-
 .../runtime/sort/BinaryExternalSorterTest.java |   5 +-
 .../runtime/sort/BufferedKVExternalSorterTest.java |   5 +-
 45 files changed, 111 insertions(+), 196 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index ee54b1e..a649e42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -24,15 +24,19 @@ import 
org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.FileUtils;
 
+import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
 
 /**
  * The facade for the provided I/O manager services.
@@ -82,39 +86,26 @@ public abstract class IOManager implements AutoCloseable {
}
 
/**
-* Close method, marks the I/O manager as closed
-* and removed all temporary files.
+* Removes all temporary files.
 */
@Override
-   public void close() {
-   // remove all of our temp directories
-   

[flink] 02/06: [hotfix][runtime] IOManager implements AutoCloseable

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5809ed32f869fa880a70a774d5b8365fe59dba4a
Author: Zhijiang 
AuthorDate: Tue Jun 18 12:09:04 2019 +0800

[hotfix][runtime] IOManager implements AutoCloseable
---
 .../flink/runtime/io/disk/iomanager/IOManager.java |  5 ++--
 .../runtime/io/disk/iomanager/IOManagerAsync.java  |  8 +++---
 .../runtime/taskexecutor/TaskManagerServices.java  |  2 +-
 .../flink/runtime/io/disk/ChannelViewsTest.java|  2 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |  2 +-
 .../runtime/io/disk/FileChannelStreamsTest.java| 12 ++--
 .../io/disk/SeekableFileChannelInputViewTest.java  |  6 +---
 .../flink/runtime/io/disk/SpillingBufferTest.java  |  2 +-
 .../AsynchronousBufferFileWriterTest.java  |  2 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   | 19 +++--
 .../BufferFileWriterFileSegmentReaderTest.java |  2 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |  2 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java  |  4 +--
 .../runtime/io/disk/iomanager/IOManagerITCase.java |  2 +-
 .../runtime/io/disk/iomanager/IOManagerTest.java   | 33 --
 .../runtime/operators/hash/HashTableITCase.java|  2 +-
 .../hash/HashTablePerformanceComparison.java   |  5 +---
 .../runtime/operators/hash/HashTableTest.java  | 21 ++
 .../hash/NonReusingHashJoinIteratorITCase.java |  2 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |  2 +-
 .../hash/ReOpenableHashTableTestBase.java  |  2 +-
 .../hash/ReusingHashJoinIteratorITCase.java|  2 +-
 .../resettable/SpillingResettableIteratorTest.java |  2 +-
 ...pillingResettableMutableObjectIteratorTest.java |  2 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |  2 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |  2 +-
 .../runtime/operators/sort/ExternalSortITCase.java |  2 +-
 .../sort/ExternalSortLargeRecordsITCase.java   |  2 +-
 .../sort/FixedLengthRecordSorterTest.java  |  2 +-
 .../operators/sort/LargeRecordHandlerITCase.java   | 18 ++--
 .../operators/sort/LargeRecordHandlerTest.java | 21 ++
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |  2 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |  2 +-
 .../operators/sort/UnilateralSortMergerTest.java   |  4 +--
 .../testutils/BinaryOperatorTestBase.java  |  2 +-
 .../operators/testutils/DriverTestBase.java|  2 +-
 .../operators/testutils/MockEnvironment.java   |  2 +-
 .../operators/testutils/UnaryOperatorTestBase.java |  2 +-
 .../operators/util/HashVsSortMiniBenchmark.java|  2 +-
 .../streaming/runtime/io/BufferSpillerTest.java|  2 +-
 ...CheckpointBarrierAlignerAlignmentLimitTest.java |  2 +-
 .../CheckpointBarrierAlignerMassiveRandomTest.java |  7 +
 .../io/SpillingCheckpointBarrierAlignerTest.java   |  2 +-
 .../StreamNetworkBenchmarkEnvironment.java |  2 +-
 .../runtime/tasks/StreamTaskTestHarness.java   |  2 +-
 .../flink/table/runtime/aggregate/HashAggTest.java |  2 +-
 .../runtime/hashtable/BinaryHashTableTest.java |  2 +-
 .../io/CompressedHeaderlessChannelTest.java|  2 +-
 .../join/Int2SortMergeJoinOperatorTest.java|  2 +-
 .../runtime/sort/BinaryExternalSorterTest.java |  2 +-
 .../runtime/sort/BufferedKVExternalSorterTest.java |  2 +-
 .../manual/HashTableRecordWidthCombinations.java   |  7 +
 .../flink/test/manual/MassiveStringSorting.java| 14 ++---
 .../test/manual/MassiveStringValueSorting.java | 14 ++---
 54 files changed, 82 insertions(+), 192 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 6723597..ee54b1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * The facade for the provided I/O manager services.
  */
-public abstract class IOManager {
+public abstract class IOManager implements AutoCloseable {
protected static final Logger LOG = 
LoggerFactory.getLogger(IOManager.class);
 
/** The temporary directories for files. */
@@ -85,7 +85,8 @@ public abstract class IOManager {
 * Close method, marks the I/O manager as closed
 * and removed all temporary files.
 */
-   public void shutdown() {
+   @Override
+   public void close() {
// remove all of our temp directories
for (File path : paths) {
try {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 
b/flink

[flink] 06/06: [hotfix][runtime] Remove legacy NoOpIOManager class

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0f747dca5b22312c50056aacb7d5b6f8c3fa1ac
Author: Zhijiang 
AuthorDate: Mon Jun 24 11:59:09 2019 +0800

[hotfix][runtime] Remove legacy NoOpIOManager class
---
 .../runtime/io/disk/iomanager/NoOpIOManager.java   | 73 --
 1 file changed, 73 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java
deleted file mode 100644
index f98c46f..000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * An {@link IOManager} that cannot do I/O but serves as a mock for tests.
- */
-public class NoOpIOManager extends IOManager {
-
-   public NoOpIOManager() {
-   super(new String[] 
{EnvironmentInformation.getTemporaryFileDirectory()});
-   }
-
-   @Override
-   public BlockChannelWriter createBlockChannelWriter(ID 
channelID, LinkedBlockingQueue returnQueue) throws IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public BlockChannelWriterWithCallback 
createBlockChannelWriter(ID channelID, RequestDoneCallback 
callback) throws IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public BlockChannelReader createBlockChannelReader(ID 
channelID, LinkedBlockingQueue returnQueue) throws IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public BufferFileWriter createBufferFileWriter(ID channelID) throws 
IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public BufferFileReader createBufferFileReader(ID channelID, 
RequestDoneCallback callback) throws IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public BufferFileSegmentReader createBufferFileSegmentReader(ID 
channelID, RequestDoneCallback callback) throws IOException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public BulkBlockChannelReader createBulkBlockChannelReader(ID 
channelID, List targetSegments, int numBlocks) throws 
IOException {
-   throw new UnsupportedOperationException();
-   }
-}



[flink] branch master updated (95944e2 -> a0f747d)

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 95944e2  [FLINK-12798][table] Add a discovery mechanism for switching 
between Flink/Blink Planner/Executor
 new cdbfb82  [hotfix][runtime] Cleanup IOManager code
 new 5809ed3  [hotfix][runtime] IOManager implements AutoCloseable
 new 423e8a8  [hotfix][runtime] Refactor IOManager#close and remove 
#isProperlyShutDown
 new f9acd2f  [FLINK-12735][network] Refactor IOManager to introduce 
FileChannelManager
 new 80003d6  [FLINK-12735][network] Make shuffle environment 
implementation independent with IOManager
 new a0f747d  [hotfix][runtime] Remove legacy NoOpIOManager class

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/io/disk/FileChannelManager.java  |  37 ++--
 .../runtime/io/disk/FileChannelManagerImpl.java| 126 +
 .../runtime/io/disk/iomanager/FileIOChannel.java   |  78 
 .../flink/runtime/io/disk/iomanager/IOManager.java | 206 +++--
 .../runtime/io/disk/iomanager/IOManagerAsync.java  | 107 ++-
 .../io/network/NettyShuffleEnvironment.java|  13 ++
 .../io/network/NettyShuffleServiceFactory.java |  17 +-
 .../network/partition/ResultPartitionFactory.java  |  16 +-
 .../runtime/shuffle/ShuffleEnvironmentContext.java |  10 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  11 +-
 .../NettyShuffleEnvironmentConfiguration.java  |  23 ++-
 .../flink/runtime/io/disk/ChannelViewsTest.java|   7 +-
 .../runtime/io/disk/FileChannelStreamsITCase.java  |   5 +-
 .../runtime/io/disk/FileChannelStreamsTest.java|  12 +-
 .../runtime/io/disk/NoOpFileChannelManager.java}   |  33 ++--
 .../io/disk/SeekableFileChannelInputViewTest.java  |   6 +-
 .../flink/runtime/io/disk/SpillingBufferTest.java  |   7 +-
 .../AsynchronousBufferFileWriterTest.java  |   4 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   |  21 +--
 .../BufferFileWriterFileSegmentReaderTest.java |   4 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |   4 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java  |   7 +-
 .../IOManagerAsyncWithNoOpBufferFileWriter.java|  53 --
 .../runtime/io/disk/iomanager/IOManagerITCase.java |   3 +-
 .../runtime/io/disk/iomanager/IOManagerTest.java   |  35 ++--
 .../runtime/io/disk/iomanager/NoOpIOManager.java   |  73 
 .../io/network/NettyShuffleEnvironmentBuilder.java |  17 +-
 .../io/network/NettyShuffleEnvironmentTest.java|  23 ++-
 .../partition/BoundedBlockingSubpartitionTest.java |  24 ++-
 .../BoundedBlockingSubpartitionWriteReadTest.java  |  21 ++-
 .../io/network/partition/PartitionTestUtils.java   |  21 +++
 .../network/partition/ResultPartitionBuilder.java  |  12 +-
 .../partition/ResultPartitionFactoryTest.java  |  22 ++-
 .../io/network/partition/ResultPartitionTest.java  |  27 ++-
 .../runtime/operators/hash/HashTableITCase.java|   7 +-
 .../hash/HashTablePerformanceComparison.java   |   5 +-
 .../runtime/operators/hash/HashTableTest.java  |  21 +--
 .../hash/NonReusingHashJoinIteratorITCase.java |   7 +-
 .../operators/hash/ReOpenableHashTableITCase.java  |   7 +-
 .../hash/ReOpenableHashTableTestBase.java  |   7 +-
 .../hash/ReusingHashJoinIteratorITCase.java|   7 +-
 .../resettable/SpillingResettableIteratorTest.java |   7 +-
 ...pillingResettableMutableObjectIteratorTest.java |   7 +-
 .../AbstractSortMergeOuterJoinIteratorITCase.java  |   7 +-
 .../sort/CombiningUnilateralSortMergerITCase.java  |   7 +-
 .../runtime/operators/sort/ExternalSortITCase.java |   7 +-
 .../sort/ExternalSortLargeRecordsITCase.java   |   7 +-
 .../sort/FixedLengthRecordSorterTest.java  |   4 +-
 .../operators/sort/LargeRecordHandlerITCase.java   |  42 ++---
 .../operators/sort/LargeRecordHandlerTest.java |  21 +--
 ...NonReusingSortMergeInnerJoinIteratorITCase.java |   7 +-
 .../ReusingSortMergeInnerJoinIteratorITCase.java   |   7 +-
 .../operators/sort/UnilateralSortMergerTest.java   |   4 +-
 .../testutils/BinaryOperatorTestBase.java  |   5 +-
 .../operators/testutils/DriverTestBase.java|   3 +-
 .../operators/testutils/MockEnvironment.java   |   6 +-
 .../runtime/operators/testutils/TaskTestBase.java  |   2 +-
 .../operators/testutils/UnaryOperatorTestBase.java |   3 +-
 .../operators/util/HashVsSortMiniBenchmark.java|   7 +-
 .../runtime/taskexecutor/TaskExecutorTest.java |   1 -
 .../streaming/runtime/io/BufferSpillerTest.java|   4 +-
 ...CheckpointBarrierAlignerAlignmentLimitTest.java |   4 +-
 .../CheckpointBarrierAlignerMassiveRandomTest.java |   7 +-
 .../io/SpillingCheckpointBarrierAlignerTest.

[flink] 05/06: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80003d62f386fc95a3dbbc414f2cd4de7a26e1bd
Author: Zhijiang 
AuthorDate: Thu Jun 27 00:29:16 2019 +0800

[FLINK-12735][network] Make shuffle environment implementation independent 
with IOManager

The current creation of NettyShuffleEnvironment relies on IOManager from 
TaskManagerServices. Actually the shuffle only needs the
file channel during creating partition, so it could internally create a 
light-weight FileChannelManager with its own prefix folder
name instead of the heavy-weight IOManagerAsync.
---
 .../io/network/NettyShuffleEnvironment.java| 13 ++
 .../io/network/NettyShuffleServiceFactory.java | 17 +---
 .../network/partition/ResultPartitionFactory.java  | 16 +++
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 10 +
 .../runtime/taskexecutor/TaskManagerServices.java  |  9 ++--
 .../NettyShuffleEnvironmentConfiguration.java  | 23 --
 .../runtime/io/disk/NoOpFileChannelManager.java| 51 ++
 .../io/network/NettyShuffleEnvironmentBuilder.java | 17 
 .../io/network/NettyShuffleEnvironmentTest.java| 23 +-
 .../partition/BoundedBlockingSubpartitionTest.java | 24 +-
 .../BoundedBlockingSubpartitionWriteReadTest.java  | 21 -
 .../io/network/partition/PartitionTestUtils.java   | 21 +
 .../network/partition/ResultPartitionBuilder.java  | 12 ++---
 .../partition/ResultPartitionFactoryTest.java  | 22 +-
 .../io/network/partition/ResultPartitionTest.java  | 27 +++-
 .../StreamNetworkBenchmarkEnvironment.java |  7 ---
 16 files changed, 250 insertions(+), 63 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 5171d75..17fb2cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
@@ -85,6 +86,8 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment inputGatesById;
 
private final ResultPartitionFactory resultPartitionFactory;
@@ -99,6 +102,7 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment(10);
+   this.fileChannelManager = fileChannelManager;
this.resultPartitionFactory = resultPartitionFactory;
this.singleInputGateFactory = singleInputGateFactory;
this.isClosed = false;
@@ -326,6 +331,14 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment 
{
 
+   private static final String DIR_NAME_PREFIX = "netty-shuffle";
+
@Override
public NettyShuffleMaster createShuffleMaster(Configuration 
configuration) {
return NettyShuffleMaster.INSTANCE;
@@ -62,8 +65,7 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory shuffleEnvironment = 
createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
-   taskManagerMetricGroup,
-   ioManager);
+   taskManagerMetricGroup);
final int dataPort = shuffleEnvironment.start();
 
final KvStateService kvStateService = 
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -307,8 +306,7 @@ public class TaskManagerServices {
private static ShuffleEnvironment createShuffleEnvironment(
TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
TaskEventDispatcher taskEventDispatcher,
-   MetricGroup taskManagerMetricGroup,
-   IOManager ioManager) throws FlinkException {
+   MetricGroup taskManagerMetricGroup) throws 
FlinkException {
 
final ShuffleEnvironmentContext shuffleEnvironmentContext = new 
ShuffleEnvironmentContext(
taskManagerServicesConfiguration.getConfiguration(),
@@ -317,8 +315,7 @@ public class TaskManagerServices {
 

[flink] 01/06: [hotfix][runtime] Cleanup IOManager code

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdbfb82ef3eaa242abf6d070463c0895ac244ef1
Author: Zhijiang 
AuthorDate: Wed Jun 19 12:36:54 2019 +0800

[hotfix][runtime] Cleanup IOManager code
---
 .../runtime/io/disk/iomanager/FileIOChannel.java   | 78 ++--
 .../flink/runtime/io/disk/iomanager/IOManager.java | 83 +++---
 .../IOManagerAsyncWithNoOpBufferFileWriter.java| 53 --
 .../operators/sort/LargeRecordHandlerITCase.java   | 26 ---
 4 files changed, 91 insertions(+), 149 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index fd8e8e6..ef57e03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -18,93 +18,90 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.util.StringUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.util.StringUtils;
-
 /**
  * A Channel represents a collection of files that belong logically to the 
same resource. An example is a collection of
  * files that contain sorted runs of data from the same stream, that will 
later on be merged together.
  */
 public interface FileIOChannel {
-   
+
/**
 * Gets the channel ID of this I/O channel.
-* 
+*
 * @return The channel ID.
 */
-   FileIOChannel.ID getChannelID();
-   
+   ID getChannelID();
+
/**
 * Gets the size (in bytes) of the file underlying the channel.
-* 
-* @return The size (in bytes) of the file underlying the channel.
 */
long getSize() throws IOException;
-   
+
/**
 * Checks whether the channel has been closed.
-* 
+*
 * @return True if the channel has been closed, false otherwise.
 */
boolean isClosed();
 
/**
-   * Closes the channel. For asynchronous implementations, this method 
waits until all pending requests are
-   * handled. Even if an exception interrupts the closing, the underlying 
FileChannel is closed.
-   * 
-   * @throws IOException Thrown, if an error occurred while waiting for 
pending requests.
-   */
+* Closes the channel. For asynchronous implementations, this method 
waits until all pending requests are
+* handled. Even if an exception interrupts the closing, the underlying 
FileChannel is closed.
+*
+* @throws IOException Thrown, if an error occurred while waiting for 
pending requests.
+*/
void close() throws IOException;
 
/**
 * Deletes the file underlying this I/O channel.
-*  
+*
 * @throws IllegalStateException Thrown, when the channel is still open.
 */
void deleteChannel();
-   
-   /**
-   * Closes the channel and deletes the underlying file.
-   * For asynchronous implementations, this method waits until all pending 
requests are handled;
-   * 
-   * @throws IOException Thrown, if an error occurred while waiting for 
pending requests.
-   */
-   public void closeAndDelete() throws IOException;
 
FileChannel getNioFileChannel();
-   
+
+   /**
+* Closes the channel and deletes the underlying file. For asynchronous 
implementations,
+* this method waits until all pending requests are handled.
+*
+* @throws IOException Thrown, if an error occurred while waiting for 
pending requests.
+*/
+   void closeAndDelete() throws IOException;
+
// 

// 

-   
+
/**
 * An ID identifying an underlying file channel.
 */
-   public static class ID {
-   
+   class ID {
+
private static final int RANDOM_BYTES_LENGTH = 16;
-   
+
private final File path;
-   
+
private final int threadNum;
 
-   protected ID(File path, int threadNum) {
+   private ID(File path, int threadNum) {
this.path = path;
this.threadNum = threadNum;
}
 
-   protected ID(File basePath, int threadNum, Random random) {
+   public ID(File basePath, int t

[flink] 04/06: [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager

2019-07-02 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9acd2ff317b4a6181e85ba50ddbe177573351d7
Author: Zhijiang 
AuthorDate: Mon Jul 1 23:31:30 2019 +0800

[FLINK-12735][network] Refactor IOManager to introduce FileChannelManager

IOManager mainly has two roles. One is for managing file channels based on 
config temp dirs, and the other is for abstracting ways to read/writer files.
We could define a FileChannelManager class for handing the file channels 
which could be reused for shuffle environment future. To do so the shuffle
environment do not need to rely on the whole IOManager.
---
 .../flink/runtime/io/disk/FileChannelManager.java  |  45 
 .../runtime/io/disk/FileChannelManagerImpl.java| 126 +
 .../flink/runtime/io/disk/iomanager/IOManager.java | 119 ++-
 3 files changed, 203 insertions(+), 87 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
new file mode 100644
index 000..22079db
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+
+import java.io.File;
+
+/**
+ * The manager used for creating/getting file IO channels based on config temp 
dirs.
+ */
+public interface FileChannelManager extends AutoCloseable {
+
+   /**
+* Creates an ID identifying an underlying file channel and returns it.
+*/
+   ID createChannel();
+
+   /**
+* Creates an enumerator for channels that logically belong together 
and returns it.
+*/
+   Enumerator createChannelEnumerator();
+
+   /**
+* Gets all the files corresponding to the config temp dirs.
+*/
+   File[] getPaths();
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
new file mode 100644
index 000..2bdb8d9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The manager used for creating/deleting file channels based on config temp 
dirs.
+ */
+public class FileChannelManagerImpl implements FileChannelManager {
+   priv

[flink] 02/02: [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95944e231315f085d5e23717332aa2866caa5d8a
Author: Dawid Wysakowicz 
AuthorDate: Wed Jun 19 15:00:23 2019 +0200

[FLINK-12798][table] Add a discovery mechanism for switching between 
Flink/Blink Planner/Executor

This closes #8852.
---
 flink-python/pyflink/table/table_config.py |  36 
 .../table/tests/test_table_environment_api.py  |  12 --
 .../client/gateway/local/ExecutionContextTest.java |   3 +-
 .../client/gateway/local/LocalExecutorITCase.java  |   7 +-
 .../table/api/java/BatchTableEnvironment.java  |   5 +-
 .../table/api/java/StreamTableEnvironment.java | 155 +-
 .../java/internal/StreamTableEnvironmentImpl.java  |  66 +++---
 .../flink/table/api/EnvironmentSettings.java   | 228 +
 .../org/apache/flink/table/api/TableConfig.java|  44 
 .../apache/flink/table/api/TableEnvironment.java   |  22 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   4 +-
 .../flink/table/delegation/ExecutorFactory.java|  50 +
 .../internal => delegation}/PlannerFactory.java|  49 ++---
 .../flink/table/factories/ComponentFactory.java|  53 +
 .../table/factories/ComponentFactoryService.java   |  80 
 .../factories/ComponentFactoryServiceTest.java |  68 ++
 .../factories/utils/OtherTestPlannerFactory.java   |  28 +++
 .../table/factories/utils/TestPlannerFactory.java  |  69 +++
 .../org.apache.flink.table.factories.TableFactory  |   6 +-
 .../table/api/scala/BatchTableEnvironment.scala|   7 +-
 .../table/api/scala/StreamTableEnvironment.scala   | 132 
 .../internal/StreamTableEnvironmentImpl.scala  |  90 
 .../flink/table/executor/StreamExecutor.java   |   4 +-
 ...utorFactory.java => StreamExecutorFactory.java} |  49 +++--
 .../flink/table/planner/StreamPlannerFactory.java  |  70 +++
 .../org.apache.flink.table.factories.TableFactory  |   2 +
 .../flink/table/api/internal/TableEnvImpl.scala|   4 +-
 .../api/stream/StreamTableEnvironmentTest.scala|  26 ++-
 .../apache/flink/table/utils/TableTestBase.scala   |  76 +++
 29 files changed, 1052 insertions(+), 393 deletions(-)

diff --git a/flink-python/pyflink/table/table_config.py 
b/flink-python/pyflink/table/table_config.py
index 7eb3513..d6b5864 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -90,39 +90,3 @@ class TableConfig(object):
 
self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length)
 else:
 raise Exception("TableConfig.max_generated_code_length should be a 
int value!")
-
-def get_built_in_catalog_name(self):
-"""
-Gets the specified name of the initial catalog to be created when 
instantiating
-:class:`TableEnvironment`.
-"""
-return self._j_table_config.getBuiltInCatalogName()
-
-def set_built_in_catalog_name(self, built_in_catalog_name):
-"""
-Specifies the name of the initial catalog to be created when 
instantiating
-:class:`TableEnvironment`. This method has no effect if called on the
-:func:`~pyflink.table.TableEnvironment.get_config`.
-"""
-if built_in_catalog_name is not None and 
isinstance(built_in_catalog_name, str):
-self._j_table_config.setBuiltInCatalogName(built_in_catalog_name)
-else:
-raise Exception("TableConfig.built_in_catalog_name should be a 
string value!")
-
-def get_built_in_database_name(self):
-"""
-Gets the specified name of the default database in the initial catalog 
to be created when
-instantiating :class:`TableEnvironment`.
-"""
-return self._j_table_config.getBuiltInDatabaseName()
-
-def set_built_in_database_name(self, built_in_database_name):
-"""
-Specifies the name of the default database in the initial catalog to 
be created when
-instantiating :class:`TableEnvironment`. This method has no effect if 
called on the
-:func:`~pyflink.table.TableEnvironment.get_config`.
-"""
-if built_in_database_name is not None and 
isinstance(built_in_database_name, str):
-self._j_table_config.setBuiltInDatabaseName(built_in_database_name)
-else:
-raise Exception("TableConfig.built_in_database_name should be a 
string value!")
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index a129d22..4eba1bb 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -170,8 +170,6 @@ class 
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
 table_config.set

[flink] branch master updated (9fa61aa -> 95944e2)

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 9fa61aa  [FLINK-13029][table-planner] Removed usages of 
ExpressionBridge in QueryOperation's factories
 new 1a224b3  [hotfix][table-common] Enable finding multiple matching 
TableFactories from TableFactoryService
 new 95944e2  [FLINK-12798][table] Add a discovery mechanism for switching 
between Flink/Blink Planner/Executor

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-python/pyflink/table/table_config.py |  36 
 .../table/tests/test_table_environment_api.py  |  12 --
 .../client/gateway/local/ExecutionContextTest.java |   3 +-
 .../client/gateway/local/LocalExecutorITCase.java  |   7 +-
 .../table/api/java/BatchTableEnvironment.java  |   5 +-
 .../table/api/java/StreamTableEnvironment.java | 155 +-
 .../java/internal/StreamTableEnvironmentImpl.java  |  66 +++---
 .../flink/table/api/EnvironmentSettings.java   | 228 +
 .../org/apache/flink/table/api/TableConfig.java|  44 
 .../apache/flink/table/api/TableEnvironment.java   |  22 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   4 +-
 .../flink/table/delegation/ExecutorFactory.java|  50 +
 .../internal => delegation}/PlannerFactory.java|  49 ++---
 .../flink/table/factories/ComponentFactory.java}   |  32 ++-
 .../table/factories/ComponentFactoryService.java   |  80 
 .../factories/ComponentFactoryServiceTest.java |  68 ++
 .../factories/utils/OtherTestPlannerFactory.java   |  12 +-
 .../table/factories/utils/TestPlannerFactory.java  |  69 +++
 .../org.apache.flink.table.factories.TableFactory  |   3 +-
 .../table/api/scala/BatchTableEnvironment.scala|   7 +-
 .../table/api/scala/StreamTableEnvironment.scala   | 132 
 .../internal/StreamTableEnvironmentImpl.scala  |  90 
 .../table/api/AmbiguousTableFactoryException.java  |  36 ++--
 .../apache/flink/table/factories/TableFactory.java |   1 -
 .../flink/table/factories/TableFactoryService.java | 146 -
 .../flink/table/executor/StreamExecutor.java   |   4 +-
 ...utorFactory.java => StreamExecutorFactory.java} |  49 +++--
 .../flink/table/planner/StreamPlannerFactory.java  |  70 +++
 .../org.apache.flink.table.factories.TableFactory  |   2 +
 .../flink/table/api/internal/TableEnvImpl.scala|   4 +-
 .../api/stream/StreamTableEnvironmentTest.scala|  26 ++-
 .../factories/TableFormatFactoryServiceTest.scala  |   7 -
 .../apache/flink/table/utils/TableTestBase.scala   |  76 +++
 33 files changed, 1115 insertions(+), 480 deletions(-)
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
 rename 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/{api/internal
 => delegation}/PlannerFactory.java (55%)
 copy 
flink-table/{flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationSchemaFactory.java
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java}
 (52%)
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java
 create mode 100644 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
 copy flink-core/src/main/java/org/apache/flink/util/SerializableObject.java => 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java
 (72%)
 create mode 100644 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
 copy {flink-connectors/flink-connector-kafka-0.10/src/main => 
flink-table/flink-table-api-java/src/test}/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 (86%)
 rename 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/{ExecutorFactory.java
 => StreamExecutorFactory.java} (51%)
 create mode 100644 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java



[flink] 01/02: [hotfix][table-common] Enable finding multiple matching TableFactories from TableFactoryService

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a224b3e0b2e11fcd580edb5f8060f7ffd3c61a7
Author: Dawid Wysakowicz 
AuthorDate: Wed Jun 19 15:00:02 2019 +0200

[hotfix][table-common] Enable finding multiple matching TableFactories from 
TableFactoryService

This commits adds methods TableFactoryService.findAll that return 
TableFactories even if they are ambiguous based on the requiredContext and 
supportedProperties. Additionally it fixes minor issues and improves type 
hanlding.
---
 .../table/api/AmbiguousTableFactoryException.java  |  36 +++--
 .../apache/flink/table/factories/TableFactory.java |   1 -
 .../flink/table/factories/TableFactoryService.java | 146 ++---
 .../factories/TableFormatFactoryServiceTest.scala  |   7 -
 4 files changed, 117 insertions(+), 73 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
index 395d5c98..ea1c72e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
@@ -31,20 +31,20 @@ import java.util.stream.Collectors;
 public class AmbiguousTableFactoryException extends RuntimeException {
 
// factories that match the properties
-   private final List matchingFactories;
+   private final List matchingFactories;
// required factory class
-   private final Class factoryClass;
+   private final Class factoryClass;
// all found factories
private final List factories;
// properties that describe the configuration
private final Map properties;
 
public AmbiguousTableFactoryException(
-   List matchingFactories,
-   Class factoryClass,
-   List factories,
-   Map properties,
-   Throwable cause) {
+   List matchingFactories,
+   Class factoryClass,
+   List factories,
+   Map properties,
+   Throwable cause) {
 
super(cause);
this.matchingFactories = matchingFactories;
@@ -54,10 +54,10 @@ public class AmbiguousTableFactoryException extends 
RuntimeException {
}
 
public AmbiguousTableFactoryException(
-   List matchingFactories,
-   Class factoryClass,
-   List factories,
-   Map properties) {
+   List matchingFactories,
+   Class factoryClass,
+   List factories,
+   Map properties) {
 
this(matchingFactories, factoryClass, factories, properties, 
null);
}
@@ -70,15 +70,13 @@ public class AmbiguousTableFactoryException extends 
RuntimeException {
"The following properties are 
requested:\n%s\n\n" +
"The following factories have been 
considered:\n%s",
factoryClass.getName(),
-   String.join(
-   "\n",
-   matchingFactories.stream()
-   .map(p -> 
p.getClass().getName()).collect(Collectors.toList())),
+   matchingFactories.stream()
+   .map(p -> p.getClass().getName())
+   .collect(Collectors.joining("\n")),
DescriptorProperties.toString(properties),
-   String.join(
-   "\n",
-   factories.stream().map(p -> 
p.getClass().getName()).collect(Collectors.toList())
-   )
+   factories.stream()
+   .map(p -> p.getClass().getName())
+   .collect(Collectors.joining("\n"))
);
}
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
index b56ffa1..d4c0628 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
@@ -55,7 +55,6 @@ public interface TableFactory {
 */
Map requiredContext();
 
-
/**
 * List of property keys that this factory can handle. This method will 
be used for validation.
 * If a property i

[flink] branch master updated (036c549 -> 9fa61aa)

2019-07-02 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 036c549  [FLINK-13028][table-api-java] Merge flatten and call 
resolution rule
 add b9ec89b  [FLINK-13029][table-planner] Ported GROUP BY expression to 
new type system
 add 9fa61aa  [FLINK-13029][table-planner] Removed usages of 
ExpressionBridge in QueryOperation's factories

No new revisions were added by this update.

Summary of changes:
 .../utils/LegacyTypeInfoDataTypeConverter.java | 27 +-
 .../operations/AggregateOperationFactory.java  | 99 +++---
 .../table/operations/JoinOperationFactory.java | 19 ++---
 .../operations/OperationTreeBuilderFactory.java|  7 --
 .../operations/ProjectionOperationFactory.java | 19 ++---
 .../flink/table/api/internal/TableEnvImpl.scala|  1 -
 .../expressions/PlannerExpressionConverter.scala   | 42 +
 .../operations/OperationTreeBuilderImpl.scala  | 17 ++--
 .../flink/table/api/batch/table/CalcTest.scala | 44 ++
 .../api/validation/TableSourceValidationTest.scala | 45 --
 10 files changed, 181 insertions(+), 139 deletions(-)



[flink] 03/05: [FLINK-13028][table-api-java] Refactor local over windows

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8c1968ac2880c3784c6c35ffb819932483208807
Author: Timo Walther 
AuthorDate: Mon Jul 1 09:12:41 2019 +0200

[FLINK-13028][table-api-java] Refactor local over windows
---
 .../flink/table/expressions/LocalOverWindow.java   | 76 ++
 .../table/expressions/ExpressionResolver.java  | 21 +++---
 .../expressions/rules/OverWindowResolverRule.java  | 16 ++---
 .../table/expressions/rules/ResolverRule.java  |  6 +-
 .../flink/table/plan/logical/groupWindows.scala| 15 +
 5 files changed, 98 insertions(+), 36 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
new file mode 100644
index 000..45e1a7a
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Local over window created during expression resolution.
+ */
+@Internal
+public final class LocalOverWindow {
+
+   private Expression alias;
+
+   private List partitionBy;
+
+   private Expression orderBy;
+
+   private Expression preceding;
+
+   private @Nullable Expression following;
+
+   LocalOverWindow(
+   Expression alias,
+   List partitionBy,
+   Expression orderBy,
+   Expression preceding,
+   @Nullable Expression following) {
+   this.alias = alias;
+   this.partitionBy = partitionBy;
+   this.orderBy = orderBy;
+   this.preceding = preceding;
+   this.following = following;
+   }
+
+   public Expression getAlias() {
+   return alias;
+   }
+
+   public List getPartitionBy() {
+   return partitionBy;
+   }
+
+   public Expression getOrderBy() {
+   return orderBy;
+   }
+
+   public Expression getPreceding() {
+   return preceding;
+   }
+
+   public Optional getFollowing() {
+   return Optional.ofNullable(following);
+   }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
index 015751c..ea45f33 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.expressions.rules.ResolverRules;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
@@ -100,13 +99,13 @@ public class ExpressionResolver {
 
private final Map localReferences;
 
-   private final Map overWindows;
+   private final Map localOverWindows;
 
private ExpressionResolver(
TableReferenceLookup tableLookup,
FunctionLookup functionLookup,
FieldReferenceLookup fieldLookup,
-   List overWindows,
+   List localOverWindows,
List localReferences) {
this.tableLookup = Preconditions.checkNotNull(tableLookup);
this.fieldLookup = Preconditions.checkNotNull(fieldLook

[flink] 04/05: [FLINK-13028][table] Refactor expression package structure

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 14fe641fc97888ccc7035051a697334ec39a0e62
Author: Timo Walther 
AuthorDate: Mon Jul 1 10:49:50 2019 +0200

[FLINK-13028][table] Refactor expression package structure
---
 .../table/api/OverWindowPartitionedOrdered.java |  2 +-
 .../table/api/internal/TableEnvironmentImpl.java|  2 +-
 .../apache/flink/table/api/internal/TableImpl.java  |  2 +-
 .../PlannerExpressionParser.java|  8 +---
 .../flink/table/expressions/ExpressionParser.java   |  1 +
 .../expressions/resolver}/ExpressionResolver.java   | 21 ++---
 .../expressions/{ => resolver}/LocalOverWindow.java |  5 ++---
 .../{ => resolver}/LookupCallResolver.java  |  6 +-
 .../resolver}/lookups/FieldReferenceLookup.java |  2 +-
 .../lookups/TableReferenceLookup.java   |  2 +-
 .../resolver}/rules/ExpandColumnFunctionsRule.java  |  8 
 .../resolver}/rules/LookupCallByNameRule.java   |  4 ++--
 .../resolver}/rules/OverWindowResolverRule.java |  8 
 .../rules/QualifyBuiltInFunctionsRule.java  |  2 +-
 .../resolver}/rules/ReferenceResolverRule.java  |  7 ---
 .../resolver}/rules/ResolveCallByArgumentsRule.java |  2 +-
 .../resolver}/rules/ResolveFlattenCallRule.java |  4 ++--
 .../expressions/resolver}/rules/ResolverRule.java   | 10 +-
 .../expressions/resolver}/rules/ResolverRules.java  |  2 +-
 .../resolver}/rules/RuleExpressionVisitor.java  |  6 +++---
 .../rules/StarReferenceFlatteningRule.java  |  2 +-
 .../{ => utils}/ApiExpressionDefaultVisitor.java| 13 -
 .../expressions/{ => utils}/ApiExpressionUtils.java | 11 ++-
 .../ResolvedExpressionDefaultVisitor.java   | 10 +-
 .../table/operations/OperationExpressionsUtils.java | 10 +-
 .../flink/table/typeutils/FieldInfoUtils.java   |  2 +-
 .../flink/table/operations/QueryOperationTest.java  |  2 +-
 .../flink/table/expressions/ExpressionBuilder.java  |  1 +
 .../functions/aggfunctions/AvgAggFunction.java  |  2 +-
 .../functions/aggfunctions/ConcatAggFunction.java   |  2 +-
 .../functions/aggfunctions/Count1AggFunction.java   |  2 +-
 .../functions/aggfunctions/CountAggFunction.java|  2 +-
 .../aggfunctions/DeclarativeAggregateFunction.java  |  2 +-
 .../functions/aggfunctions/IncrSumAggFunction.java  |  2 +-
 .../aggfunctions/IncrSumWithRetractAggFunction.java |  2 +-
 .../functions/aggfunctions/LeadLagAggFunction.java  |  2 +-
 .../functions/aggfunctions/MaxAggFunction.java  |  2 +-
 .../functions/aggfunctions/MinAggFunction.java  |  2 +-
 .../functions/aggfunctions/RankAggFunction.java |  2 +-
 .../aggfunctions/RankLikeAggFunctionBase.java   |  2 +-
 .../aggfunctions/RowNumberAggFunction.java  |  2 +-
 .../aggfunctions/SingleValueAggFunction.java|  2 +-
 .../functions/aggfunctions/Sum0AggFunction.java |  2 +-
 .../functions/aggfunctions/SumAggFunction.java  |  2 +-
 .../aggfunctions/SumWithRetractAggFunction.java |  2 +-
 .../table/codegen/agg/DeclarativeAggCodeGen.scala   |  4 ++--
 .../table/codegen/agg/batch/AggCodeGenHelper.scala  |  3 ++-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala|  3 ++-
 .../logical/LogicalWindowAggregateRuleBase.scala|  2 +-
 .../flink/table/plan/util/RexNodeExtractor.scala|  2 +-
 .../flink/table/sources/TableSourceUtil.scala   |  2 +-
 .../table/sources/tsextractors/ExistingField.scala  |  2 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala   |  2 +-
 .../table/plan/util/RexNodeExtractorTest.scala  |  2 +-
 .../apache/flink/table/util/testTableSources.scala  |  2 +-
 .../table/operations/AggregateOperationFactory.java |  6 +++---
 .../flink/table/operations/AliasOperationUtils.java |  8 
 .../table/operations/CalculatedTableFactory.java|  4 ++--
 .../table/operations/ColumnOperationUtils.java  |  6 +++---
 .../table/operations/JoinOperationFactory.java  |  2 +-
 .../operations/OperationTreeBuilderFactory.java |  2 +-
 .../operations/ProjectionOperationFactory.java  |  4 ++--
 .../table/operations/SortOperationFactory.java  |  4 ++--
 .../flink/table/plan/QueryOperationConverter.java   |  4 ++--
 .../table/api/internal/BatchTableEnvImpl.scala  |  3 ++-
 .../flink/table/api/internal/TableEnvImpl.scala |  2 +-
 .../flink/table/api/scala/expressionDsl.scala   |  2 +-
 .../flink/table/expressions/ExpressionBridge.scala  |  1 +
 .../expressions/PlannerExpressionParserImpl.scala   |  3 ++-
 .../table/operations/OperationTreeBuilderImpl.scala |  8 +---
 .../flink/table/plan/util/RexProgramExtractor.scala |  2 +-
 .../flink/table/expressions/KeywordParseTest.scala  |  2 +-
 72 files changed, 163 insertions(+), 113 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apa

[flink] 02/05: [FLINK-13028][table-api-java] Remove planner expression from ExpressionResolver

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 272ac71328aac0583df859bf197e30afcec0981f
Author: Timo Walther 
AuthorDate: Mon Jul 1 08:25:05 2019 +0200

[FLINK-13028][table-api-java] Remove planner expression from 
ExpressionResolver
---
 .../table/expressions/ExpressionResolver.java  | 98 +++---
 .../expressions/rules/OverWindowResolverRule.java  | 50 ++-
 .../rules/ResolveCallByArgumentsRule.java  | 26 --
 ...enCallRule.java => ResolveFlattenCallRule.java} | 39 +
 .../table/expressions/rules/ResolverRule.java  |  6 --
 .../table/expressions/rules/ResolverRules.java |  4 +-
 .../table/validation/CalcValidationTest.scala  | 12 +++
 7 files changed, 119 insertions(+), 116 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
index 394180b..015751c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
@@ -19,14 +19,9 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.OverWindow;
-import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
-import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
 import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
@@ -36,15 +31,9 @@ import 
org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.plan.logical.LogicalOverWindow;
-import org.apache.flink.table.plan.logical.LogicalWindow;
-import org.apache.flink.table.plan.logical.SessionGroupWindow;
-import org.apache.flink.table.plan.logical.SlidingGroupWindow;
-import org.apache.flink.table.plan.logical.TumblingGroupWindow;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -54,11 +43,8 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import scala.Some;
-
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
-import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
  * Tries to resolve all unresolved expressions such as {@link 
UnresolvedReferenceExpression}
@@ -97,15 +83,13 @@ public class ExpressionResolver {
ResolverRules.EXPAND_COLUMN_FUNCTIONS,
ResolverRules.OVER_WINDOWS,
ResolverRules.FIELD_RESOLVE,
-   ResolverRules.FLATTEN_CALL,
ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
-   ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
+   ResolverRules.RESOLVE_CALL_BY_ARGUMENTS,
+   ResolverRules.FLATTEN_CALL);
}
 
private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR 
= new VerifyResolutionVisitor();
 
-   private final PlannerExpressionConverter bridgeConverter = 
PlannerExpressionConverter.INSTANCE();
-
private final FieldReferenceLookup fieldLookup;
 
private final TableReferenceLookup tableLookup;
@@ -187,54 +171,6 @@ public class ExpressionResolver {
}
 
/**
-* Converts an API class to a logical window for planning with 
expressions already resolved.
-*
-* @param window window to resolve
-* @return logical window with expressions resolved
-*/
-   public LogicalWindow resolveGroupWindow(GroupWindow window) {
-   Expression alias = window.getAlias();
-
-   if (!(alias instanceof UnresolvedReferenceExpression)) {
-   throw new ValidationException("Alias of group window 
should be an UnresolvedFieldReference");
-   }
-
-   final String windowName = ((UnresolvedReferenceExpression) 

[flink] 05/05: [FLINK-13028][table-api-java] Merge flatten and call resolution rule

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 036c5492a9bfe7636d5e7d5eb664af5e77952707
Author: Timo Walther 
AuthorDate: Mon Jul 1 15:25:04 2019 +0200

[FLINK-13028][table-api-java] Merge flatten and call resolution rule
---
 .../expressions/resolver/ExpressionResolver.java   |   3 +-
 .../resolver/rules/ResolveCallByArgumentsRule.java |  94 ---
 .../resolver/rules/ResolveFlattenCallRule.java | 101 -
 .../expressions/resolver/rules/ResolverRules.java  |   5 -
 .../flink/table/api/batch/table/CalcTest.scala |   1 +
 .../table/validation/CalcValidationTest.scala  |  12 ---
 6 files changed, 62 insertions(+), 154 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 4c1e62d..e4c07fa 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -90,8 +90,7 @@ public class ExpressionResolver {
ResolverRules.OVER_WINDOWS,
ResolverRules.FIELD_RESOLVE,
ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
-   ResolverRules.RESOLVE_CALL_BY_ARGUMENTS,
-   ResolverRules.FLATTEN_CALL);
+   ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
}
 
private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR 
= new VerifyResolutionVisitor();
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 73429f4..1f184c8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -38,21 +40,26 @@ import 
org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the 
given arguments and infers
  * the output data type. All function calls are resolved {@link 
CallExpression} after applying this
- * rule except for the special case of {@link 
BuiltInFunctionDefinitions#FLATTEN}.
+ * rule.
+ *
+ * This rule also resolves {@code flatten()} calls on composite types.
  *
  * If the call expects different types of arguments, but the given 
arguments have types that can
  * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
- *
- * @see ResolveFlattenCallRule
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
@@ -60,39 +67,27 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
@Override
public List apply(List expression, 
ResolutionContext context) {
return expression.stream()
-   .map(expr -> expr.accept(new 
CallArgumentsCastingVisitor(context)))
+   .flatMap(expr -> expr.accept(new 
ResolvingCallVisitor(context)).stream())
.collect(Collectors.toList());
}
 
-   private class CallArgumentsCastingVisitor extends 
RuleExpressionVisitor {
+   // 

+
+   private class ResolvingCallVisitor extends 
RuleExpressionVisit

[flink] branch master updated (c3f2ad8 -> 036c549)

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c3f2ad8  [FLINK-12809][table-api] Introduce PartitionableTableSink for 
supporting writing data into partitions
 new 6161043  [FLINK-13028][table-api-java] Extract legacy type inference 
logic
 new 272ac71  [FLINK-13028][table-api-java] Remove planner expression from 
ExpressionResolver
 new 8c1968a  [FLINK-13028][table-api-java] Refactor local over windows
 new 14fe641  [FLINK-13028][table] Refactor expression package structure
 new 036c549  [FLINK-13028][table-api-java] Merge flatten and call 
resolution rule

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/api/OverWindowPartitionedOrdered.java|   2 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   2 +-
 .../apache/flink/table/api/internal/TableImpl.java |   2 +-
 .../PlannerExpressionParser.java   |   8 +-
 .../table/delegation/PlannerTypeInferenceUtil.java |  75 
 .../flink/table/expressions/ExpressionParser.java  |   1 +
 .../expressions/resolver}/ExpressionResolver.java  | 137 --
 .../expressions/resolver/LocalOverWindow.java  |  75 
 .../{ => resolver}/LookupCallResolver.java |   6 +-
 .../resolver}/lookups/FieldReferenceLookup.java|   2 +-
 .../lookups/TableReferenceLookup.java  |   2 +-
 .../resolver}/rules/ExpandColumnFunctionsRule.java |   8 +-
 .../resolver}/rules/LookupCallByNameRule.java  |   4 +-
 .../resolver}/rules/OverWindowResolverRule.java|  68 +--
 .../rules/QualifyBuiltInFunctionsRule.java |   2 +-
 .../resolver}/rules/ReferenceResolverRule.java |   7 +-
 .../rules/ResolveCallByArgumentsRule.java  | 209 +
 .../expressions/resolver}/rules/ResolverRule.java  |  20 +-
 .../expressions/resolver}/rules/ResolverRules.java |   7 +-
 .../resolver}/rules/RuleExpressionVisitor.java |   6 +-
 .../rules/StarReferenceFlatteningRule.java |   2 +-
 .../{ => utils}/ApiExpressionDefaultVisitor.java   |  13 +-
 .../{ => utils}/ApiExpressionUtils.java|  11 +-
 .../ResolvedExpressionDefaultVisitor.java  |  10 +-
 .../operations/OperationExpressionsUtils.java  |  10 +-
 .../flink/table/typeutils/FieldInfoUtils.java  |   2 +-
 .../flink/table/operations/QueryOperationTest.java |   2 +-
 .../flink/table/expressions/ExpressionBuilder.java |   1 +
 .../functions/aggfunctions/AvgAggFunction.java |   2 +-
 .../functions/aggfunctions/ConcatAggFunction.java  |   2 +-
 .../functions/aggfunctions/Count1AggFunction.java  |   2 +-
 .../functions/aggfunctions/CountAggFunction.java   |   2 +-
 .../aggfunctions/DeclarativeAggregateFunction.java |   2 +-
 .../functions/aggfunctions/IncrSumAggFunction.java |   2 +-
 .../IncrSumWithRetractAggFunction.java |   2 +-
 .../functions/aggfunctions/LeadLagAggFunction.java |   2 +-
 .../functions/aggfunctions/MaxAggFunction.java |   2 +-
 .../functions/aggfunctions/MinAggFunction.java |   2 +-
 .../functions/aggfunctions/RankAggFunction.java|   2 +-
 .../aggfunctions/RankLikeAggFunctionBase.java  |   2 +-
 .../aggfunctions/RowNumberAggFunction.java |   2 +-
 .../aggfunctions/SingleValueAggFunction.java   |   2 +-
 .../functions/aggfunctions/Sum0AggFunction.java|   2 +-
 .../functions/aggfunctions/SumAggFunction.java |   2 +-
 .../aggfunctions/SumWithRetractAggFunction.java|   2 +-
 .../table/codegen/agg/DeclarativeAggCodeGen.scala  |   4 +-
 .../table/codegen/agg/batch/AggCodeGenHelper.scala |   3 +-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |   3 +-
 .../logical/LogicalWindowAggregateRuleBase.scala   |   2 +-
 .../flink/table/plan/util/RexNodeExtractor.scala   |   2 +-
 .../flink/table/sources/TableSourceUtil.scala  |   2 +-
 .../table/sources/tsextractors/ExistingField.scala |   2 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   2 +-
 .../table/plan/util/RexNodeExtractorTest.scala |   2 +-
 .../apache/flink/table/util/testTableSources.scala |   2 +-
 .../expressions/PlannerTypeInferenceUtilImpl.java  | 140 ++
 .../table/expressions/rules/FlattenCallRule.java   |  90 -
 .../operations/AggregateOperationFactory.java  |   6 +-
 .../table/operations/AliasOperationUtils.java  |   8 +-
 .../table/operations/CalculatedTableFactory.java   |   4 +-
 .../table/operations/ColumnOperationUtils.java |   6 +-
 .../table/operations/JoinOperationFactory.java |   2 +-
 .../operations/OperationTreeBuilderFactory.java|   2 +-
 .../operations/ProjectionOperationFactory.java |   4 +-
 .../table/operations/SortOperationFactory.java |   4 

[flink] 01/05: [FLINK-13028][table-api-java] Extract legacy type inference logic

2019-07-02 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6161043a2afbd458f2587f3f63df60f620c755b7
Author: Timo Walther 
AuthorDate: Fri Jun 28 13:34:29 2019 +0200

[FLINK-13028][table-api-java] Extract legacy type inference logic
---
 .../table/delegation/PlannerTypeInferenceUtil.java |  75 +++
 .../expressions/PlannerTypeInferenceUtilImpl.java  | 140 +
 .../rules/ResolveCallByArgumentsRule.java  | 127 ---
 3 files changed, 237 insertions(+), 105 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
new file mode 100644
index 000..4e003ff
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeInferenceUtil;
+
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+/**
+ * Temporary utility for validation and output type inference until all {@code 
PlannerExpression} are
+ * upgraded to work with {@link TypeInferenceUtil}.
+ */
+@Internal
+public interface PlannerTypeInferenceUtil {
+
+   static PlannerTypeInferenceUtil create() {
+   return 
SingletonPlannerTypeInferenceUtil.getPlannerTypeInferenceUtil();
+   }
+
+   /**
+* Same behavior as {@link 
TypeInferenceUtil#runTypeInference(TypeInference, CallContext)}.
+*/
+   TypeInferenceUtil.Result runTypeInference(
+   UnresolvedCallExpression unresolvedCall,
+   List resolvedArgs);
+
+   /**
+* A singleton pattern utility for avoiding creating many {@link 
PlannerTypeInferenceUtil}.
+*/
+   class SingletonPlannerTypeInferenceUtil {
+
+   private static PlannerTypeInferenceUtil 
plannerTypeInferenceUtil;
+
+   public static PlannerTypeInferenceUtil 
getPlannerTypeInferenceUtil() {
+   if (plannerTypeInferenceUtil == null) {
+   try {
+   final Class clazz =
+   
Class.forName("org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl");
+   final Constructor con = 
clazz.getConstructor();
+   plannerTypeInferenceUtil = 
(PlannerTypeInferenceUtil) con.newInstance();
+   } catch (Throwable t) {
+   throw new TableException("Instantiation 
of PlannerTypeInferenceUtil failed.", t);
+   }
+   }
+   return plannerTypeInferenceUtil;
+   }
+
+   private SingletonPlannerTypeInferenceUtil() {
+   // no instantiation
+   }
+   }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
new file mode 100644
index 000..30c3421
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additiona

[flink] 03/03: [FLINK-12809][table-api] Introduce PartitionableTableSink for supporting writing data into partitions

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c3f2ad8c8c10097dc7f680ccdf2e9b99babc349c
Author: 云邪 
AuthorDate: Wed Jun 12 11:01:40 2019 +0800

[FLINK-12809][table-api] Introduce PartitionableTableSink for supporting 
writing data into partitions
---
 .../flink/table/sinks/PartitionableTableSink.java  | 112 +
 1 file changed, 112 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java
new file mode 100644
index 000..5e7c26c
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An interface for partitionable {@link TableSink}. A partitionable sink can 
writes
+ * query results to partitions.
+ *
+ * Partition columns are defined via {@link #getPartitionFieldNames()} and 
the field names
+ * should be sorted in a strict order. And all the partition fields should 
exist in the
+ * {@link TableSink#getTableSchema()}.
+ *
+ * For example, a partitioned table named {@code my_table} with a table 
schema
+ * {@code [a INT, b VARCHAR, c DOUBLE, dt VARCHAR, country VARCHAR]} is 
partitioned on columns
+ * {@code dt, country}. Then {@code dt} is the first partition column, and
+ * {@code country} is the secondary partition column.
+ *
+ * We can insert data into table partitions using INSERT INTO PARTITION 
syntax, for example:
+ * 
+ * 
+ * INSERT INTO my_table PARTITION (dt='2019-06-20', country='bar') select 
a, b, c from my_view
+ * 
+ * 
+ * When all the partition columns are set a value in PARTITION clause, it is 
inserting into a
+ * static partition. It will writes the query result into a static partition,
+ * i.e. {@code dt='2019-06-20', country='bar'}. The user specified static 
partitions will be told
+ * to the sink via {@link #setStaticPartition(Map)}.
+ *
+ * The INSERT INTO PARTITION syntax also supports dynamic partition inserts.
+ * 
+ * 
+ * INSERT INTO my_table PARTITION (dt='2019-06-20') select a, b, c, 
country from another_view
+ * 
+ * 
+ * When partial partition columns (prefix part of all partition columns) are 
set a value in
+ * PARTITION clause, it is writing the query result into a dynamic partition. 
In the above example,
+ * the static partition part is {@code dt='2019-06-20'} which will be told to 
the sink via
+ * {@link #setStaticPartition(Map)}. And the {@code country} is the dynamic 
partition which will be
+ * get from each record.
+ */
+@Experimental
+public interface PartitionableTableSink {
+
+   /**
+* Gets the partition field names of the table. The partition field 
names should be sorted in
+* a strict order, i.e. they have the order as specified in the 
PARTITION statement in DDL.
+* This should be an empty set if the table is not partitioned.
+*
+* All the partition fields should exist in the {@link 
TableSink#getTableSchema()}.
+*
+* @return partition field names of the table, empty if the table is 
not partitioned.
+*/
+   List getPartitionFieldNames();
+
+   /**
+* Sets the static partition into the {@link TableSink}. The static 
partition may be partial
+* of all partition columns. See the class Javadoc for more details.
+*
+* The static partition is represented as a {@code Map} which maps from
+* partition field name to partition value. The partition values are 
all encoded as strings,
+* i.e. encoded using String.valueOf(...). For example, if we have a 
static partition
+* {@code f0=1024, f1="foo", f2="bar"}. f0 is an integer type, f1 and 
f2 are string types.
+* They will all be encoded as strings: "1024", "foo", "bar". And can 
be decoded to origi

[flink] branch master updated (a6d72fe -> c3f2ad8)

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from a6d72fe  [FLINK-12937][table-planner-blink] Introduce join reorder 
planner rules in blink planner
 new 6e48056  [FLINK-12805][table-api] Introduce PartitionableTableSource 
for partition pruning
 new fb111ef  [FLINK-12808][table-api] Introduce OverwritableTableSink for 
supporting insert overwrite
 new c3f2ad8  [FLINK-12809][table-api] Introduce PartitionableTableSink for 
supporting writing data into partitions

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/sinks/OverwritableTableSink.java}  |  21 ++--
 .../flink/table/sinks/PartitionableTableSink.java  | 112 +
 .../table/sources/PartitionableTableSource.java|  68 +
 3 files changed, 191 insertions(+), 10 deletions(-)
 copy 
flink-table/{flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java
 => 
flink-table-common/src/main/java/org/apache/flink/table/sinks/OverwritableTableSink.java}
 (65%)
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/PartitionableTableSource.java



[flink] 02/03: [FLINK-12808][table-api] Introduce OverwritableTableSink for supporting insert overwrite

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb111ef22b8f87dd25e61160678fc8857dffcfe0
Author: 云邪 
AuthorDate: Wed Jun 12 11:01:20 2019 +0800

[FLINK-12808][table-api] Introduce OverwritableTableSink for supporting 
insert overwrite
---
 .../flink/table/sinks/OverwritableTableSink.java   | 36 ++
 1 file changed, 36 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/OverwritableTableSink.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/OverwritableTableSink.java
new file mode 100644
index 000..6b46ef2
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/OverwritableTableSink.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * A {@link TableSink} that supports INSERT OVERWRITE should implement this 
trait.
+ * INSERT OVERWRITE will overwrite any existing data in the table or partition.
+ *
+ * @see PartitionableTableSink for the definition of partition.
+ */
+@Experimental
+public interface OverwritableTableSink {
+
+   /**
+* Configures whether the insert should overwrite existing data or not.
+*/
+   void setOverwrite(boolean overwrite);
+}



[flink] 01/03: [FLINK-12805][table-api] Introduce PartitionableTableSource for partition pruning

2019-07-02 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6e4805673739fa1f8e85389d38ef6f3de20fc005
Author: 云邪 
AuthorDate: Wed Jun 12 11:01:01 2019 +0800

[FLINK-12805][table-api] Introduce PartitionableTableSource for partition 
pruning
---
 .../table/sources/PartitionableTableSource.java| 68 ++
 1 file changed, 68 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/PartitionableTableSource.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/PartitionableTableSource.java
new file mode 100644
index 000..591ec50
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/PartitionableTableSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.sinks.TableSink;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An interface for partitionable {@link TableSource}.
+ *
+ * A {@link PartitionableTableSource} can exclude partitions from reading, 
which
+ * includes skipping the metadata. This is especially useful when there are 
thousands
+ * of partitions in a table.
+ *
+ * A partition is represented as a {@code Map} which maps 
from partition
+ * field name to partition value. Since the map is NOT ordered, the correct 
order of partition
+ * fields should be obtained via {@link #getPartitionFieldNames()}.
+ */
+@Experimental
+public interface PartitionableTableSource {
+
+   /**
+* Returns all the partitions of this {@link PartitionableTableSource}.
+*/
+   List> getPartitions();
+
+   /**
+* Gets the partition field names of the table. The partition field 
names should be sorted in
+* a strict order, i.e. they have the order as specified in the 
PARTITION statement in DDL.
+* This should be an empty set if the table is not partitioned.
+*
+* All the partition fields should exist in the {@link 
TableSink#getTableSchema()}.
+*
+* @return partition field names of the table, empty if the table is 
not partitioned.
+*/
+   List getPartitionFieldNames();
+
+   /**
+* Applies the remaining partitions to the table source. The {@code 
remainingPartitions} is
+* the remaining partitions of {@link #getPartitions()} after partition 
pruning applied.
+*
+* After trying to apply partition pruning, we should return a new 
{@link TableSource}
+* instance which holds all pruned-partitions.
+*
+* @param remainingPartitions Remaining partitions after partition 
pruning applied.
+* @return A new cloned instance of {@link TableSource} holds all 
pruned-partitions.
+*/
+   TableSource applyPartitionPruning(List> 
remainingPartitions);
+}



[flink] branch master updated: [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner

2019-07-02 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new a6d72fe  [FLINK-12937][table-planner-blink] Introduce join reorder 
planner rules in blink planner
a6d72fe is described below

commit a6d72fed708e99404c96ffeca6ad70e08963e9cd
Author: godfreyhe 
AuthorDate: Sat Jun 22 12:50:06 2019 +0800

[FLINK-12937][table-planner-blink] Introduce join reorder planner rules in 
blink planner

This closes #8832
---
 .../flink/table/api/PlannerConfigOptions.java  |   6 +
 .../plan/optimize/program/FlinkBatchProgram.scala  |  22 +-
 .../plan/optimize/program/FlinkStreamProgram.scala |  23 +-
 .../table/plan/rules/FlinkBatchRuleSets.scala  |  16 +
 .../table/plan/rules/FlinkStreamRuleSets.scala |  16 +
 .../logical/RewriteMultiJoinConditionRule.scala| 129 +
 .../table/plan/batch/sql/join/JoinReorderTest.xml  | 600 +++
 .../logical/RewriteMultiJoinConditionRuleTest.xml  | 318 +++
 .../table/plan/stream/sql/join/JoinReorderTest.xml | 635 +
 .../plan/batch/sql/join/JoinReorderTest.scala  |  25 +
 .../table/plan/common/JoinReorderTestBase.scala| 233 
 .../FlinkAggregateInnerJoinTransposeRuleTest.scala |   2 +-
 .../logical/FlinkJoinPushExpressionsRuleTest.scala |  10 +-
 .../RewriteMultiJoinConditionRuleTest.scala| 153 +
 .../plan/stream/sql/join/JoinReorderTest.scala |  25 +
 15 files changed, 2208 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
index 7b05df1..5e7a1a3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
@@ -137,4 +137,10 @@ public class PlannerConfigOptions {
.defaultValue(true)
.withDescription("Allow trying to push 
predicate down to a FilterableTableSource. " +
"the default value is true, 
means allow the attempt.");
+
+   public static final ConfigOption 
SQL_OPTIMIZER_JOIN_REORDER_ENABLED =
+   key("sql.optimizer.join-reorder.enabled")
+   .defaultValue(false)
+   .withDescription("Enables join reorder 
in optimizer cbo. Default is disabled.");
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
index 090b4fb..e73fde6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.plan.optimize.program
 
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.PlannerConfigOptions
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
 
@@ -33,6 +34,7 @@ object FlinkBatchProgram {
   val DECORRELATE = "decorrelate"
   val DEFAULT_REWRITE = "default_rewrite"
   val PREDICATE_PUSHDOWN = "predicate_pushdown"
+  val JOIN_REORDER = "join_reorder"
   val JOIN_REWRITE = "join_rewrite"
   val WINDOW = "window"
   val LOGICAL = "logical"
@@ -120,7 +122,7 @@ object FlinkBatchProgram {
 .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
 .add(FlinkBatchRuleSets.FILTER_PREPARE_RULES)
 .build(), "other predicate rewrite")
-.setIterations(5).build())
+.setIterations(5).build(), "predicate rewrite")
 .addProgram(
   FlinkHepRuleSetProgramBuilder.newBuilder
 .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
@@ -135,6 +137,24 @@ object FlinkBatchProgram {
 .build(), "prune empty after predicate push down")
 .build())
 
+// join reorder
+if 
(config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) {
+  chainedProgram.addLast(
+JOIN_REORDER,
+FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
+  .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+

[flink] branch master updated: [FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI join and lookup join

2019-07-02 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 0119afa  [FLINK-12703][table-planner-blink] Introduce metadata 
handlers on SEMI/ANTI join and lookup join
0119afa is described below

commit 0119afaf28da2cf39e772b4fb568812ad09cfa79
Author: godfreyhe 
AuthorDate: Sat Jun 1 17:44:31 2019 +0800

[FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI 
join and lookup join

This closes #8588
---
 .../plan/metadata/FlinkRelMdColumnInterval.scala   |  10 ++
 .../plan/metadata/FlinkRelMdColumnNullCount.scala  |  10 ++
 .../metadata/FlinkRelMdColumnOriginNullCount.scala |   4 +-
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |  18 ++-
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |  13 +-
 .../flink/table/plan/metadata/FlinkRelMdSize.scala |   7 +-
 .../plan/metadata/FlinkRelMdUniqueGroups.scala |   8 +-
 .../table/plan/metadata/FlinkRelMdUniqueKeys.scala |  19 ++-
 .../table/plan/batch/sql/join/LookupJoinTest.scala |   1 -
 .../metadata/FlinkRelMdColumnIntervalTest.scala|  23 +++
 .../metadata/FlinkRelMdColumnNullCountTest.scala   |  14 ++
 .../FlinkRelMdColumnOriginNullCountTest.scala  |  10 +-
 .../metadata/FlinkRelMdColumnUniquenessTest.scala  |  37 +
 .../metadata/FlinkRelMdDistinctRowCountTest.scala  |  18 +++
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 174 -
 .../FlinkRelMdModifiedMonotonicityTest.scala   |   3 +
 .../FlinkRelMdPercentageOriginalRowsTest.scala |   4 +
 .../metadata/FlinkRelMdPopulationSizeTest.scala|  14 ++
 .../plan/metadata/FlinkRelMdRowCountTest.scala |  17 ++
 .../plan/metadata/FlinkRelMdSelectivityTest.scala  |  12 ++
 .../table/plan/metadata/FlinkRelMdSizeTest.scala   |   5 +
 .../plan/metadata/FlinkRelMdUniqueGroupsTest.scala |  45 +-
 .../plan/metadata/FlinkRelMdUniqueKeysTest.scala   |  23 +++
 .../table/plan/metadata/MetadataTestUtil.scala |  19 ++-
 24 files changed, 476 insertions(+), 32 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
index 20d4610..ed13607 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -101,6 +101,16 @@ class FlinkRelMdColumnInterval private extends 
MetadataHandler[ColumnInterval] {
   }
 
   /**
+* Gets interval of the given column on Snapshot.
+*
+* @param snapshotSnapshot RelNode
+* @param mqRelMetadataQuery instance
+* @param index the index of the given column
+* @return interval of the given column on Snapshot.
+*/
+  def getColumnInterval(snapshot: Snapshot, mq: RelMetadataQuery, index: Int): 
ValueInterval = null
+
+  /**
 * Gets interval of the given column on Project.
 *
 * Note: Only support the simple RexNode, e.g RexInputRef.
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
index ec7a59c..673f1cd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
@@ -68,6 +68,16 @@ class FlinkRelMdColumnNullCount private extends 
MetadataHandler[ColumnNullCount]
   }
 
   /**
+* Gets the null count of the given column on Snapshot.
+*
+* @param snapshotSnapshot RelNode
+* @param mqRelMetadataQuery instance
+* @param index the index of the given column
+* @return the null count of the given column on Snapshot.
+*/
+  def getColumnNullCount(snapshot: Snapshot, mq: RelMetadataQuery, index: 
Int): JDouble = null
+
+  /**
 * Gets the null count of the given column in Project.
 *
 * @param project Project RelNode
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala
index 6b88b9b..61bb21b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/fl