[flink] branch master updated (d81ac48 -> 886b01d)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d81ac48 [hotfix][docs] remove duplicate `to` in state doc new d1b22b5 [FLINK-13049][table-planner-blink] Rename windowProperties and PlannerResolvedFieldReference to avoid name conflit new 886b01d [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../expressions/PlannerTypeInferenceUtilImpl.java | 142 .../table/functions/sql/FlinkSqlOperatorTable.java | 1 + .../table/api/ExpressionParserException.scala} | 0 .../flink/table/calcite/FlinkRelBuilder.scala | 4 +- .../flink/table/calcite/FlinkTypeFactory.scala | 21 +- .../codegen/agg/AggsHandlerCodeGenerator.scala | 14 +- .../agg/batch/HashWindowCodeGenerator.scala| 4 +- .../agg/batch/SortWindowCodeGenerator.scala| 4 +- .../codegen/agg/batch/WindowCodeGenerator.scala| 4 +- ...nce.scala => ExestingFieldFieldReference.scala} | 2 +- .../flink/table/expressions/ExpressionBridge.scala | 0 .../flink/table/expressions/InputTypeSpec.scala| 0 .../table/expressions/PlannerExpression.scala | 0 .../expressions/PlannerExpressionConverter.scala | 836 + .../expressions/PlannerExpressionParserImpl.scala | 726 ++ .../table/expressions/PlannerExpressionUtils.scala | 0 .../flink/table/expressions/aggregations.scala | 439 +++ .../flink/table/expressions/arithmetic.scala | 165 .../org/apache/flink/table/expressions/call.scala | 326 .../org/apache/flink/table/expressions/cast.scala | 59 ++ .../flink/table/expressions/collection.scala | 235 ++ .../flink/table/expressions/comparison.scala | 242 ++ .../apache/flink/table/expressions/composite.scala | 0 .../flink/table/expressions/fieldExpression.scala | 253 +++ .../flink/table/expressions/hashExpressions.scala | 124 +++ .../apache/flink/table/expressions/literals.scala | 139 .../org/apache/flink/table/expressions/logic.scala | 109 +++ .../flink/table/expressions/mathExpressions.scala | 532 + .../apache/flink/table/expressions/ordering.scala | 0 .../flink/table/expressions/overOffsets.scala | 0 .../apache/flink/table/expressions/package.scala | 0 ...perties.scala => plannerWindowProperties.scala} | 24 +- .../table/expressions/stringExpressions.scala | 585 ++ .../apache/flink/table/expressions/subquery.scala | 95 +++ .../apache/flink/table/expressions/symbols.scala | 0 .../org/apache/flink/table/expressions/time.scala | 369 + .../flink/table/expressions/windowProperties.scala | 73 +- .../functions/utils/UserDefinedFunctionUtils.scala | 8 + .../org/apache/flink/table/plan/TreeNode.scala | 115 +++ .../flink/table/plan/logical/groupWindows.scala| 8 +- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 4 +- .../plan/metadata/FlinkRelMdUniqueGroups.scala | 4 +- .../table/plan/metadata/FlinkRelMdUniqueKeys.scala | 4 +- .../nodes/calcite/LogicalWindowAggregate.scala | 8 +- .../table/plan/nodes/calcite/WindowAggregate.scala | 6 +- .../logical/FlinkLogicalWindowAggregate.scala | 4 +- .../batch/BatchExecHashWindowAggregate.scala | 4 +- .../batch/BatchExecHashWindowAggregateBase.scala | 4 +- .../batch/BatchExecLocalHashWindowAggregate.scala | 4 +- .../batch/BatchExecLocalSortWindowAggregate.scala | 4 +- .../batch/BatchExecSortWindowAggregate.scala | 4 +- .../batch/BatchExecSortWindowAggregateBase.scala | 4 +- .../batch/BatchExecWindowAggregateBase.scala | 6 +- .../stream/StreamExecGroupWindowAggregate.scala| 6 +- .../logical/LogicalWindowAggregateRuleBase.scala | 8 +- .../plan/rules/logical/WindowPropertiesRule.scala | 20 +- .../flink/table/plan/util/AggregateUtil.scala | 20 +- .../flink/table/plan/util/FlinkRelMdUtil.scala | 4 +- .../flink/table/plan/util/RelExplainUtil.scala | 4 +- .../flink/table/plan/util/RexNodeExtractor.scala | 9 +- .../flink/table/sources/TableSourceUtil.scala | 25 +- .../table/sources/tsextractors/ExistingField.scala | 4 +- .../flink/table/typeutils/TypeInfoCheckUtils.scala | 277 +++ .../flink/table/validate/ValidationResult.scala| 0 .../flink/table/expressions/KeywordParseTest.scala | 62 ++ .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 16 +- .../table/plan/util/RexNodeExtractorTest.scala | 195 ++--- 67 files changed, 6114 insertions(+), 254 deletions(-) create mode 100644 flink-table/flink-t
[flink] 01/02: [FLINK-13049][table-planner-blink] Rename windowProperties and PlannerResolvedFieldReference to avoid name conflit
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit d1b22b56a92eef26998555dc35371447593afc8c Author: JingsongLi AuthorDate: Tue Jul 2 09:56:38 2019 +0800 [FLINK-13049][table-planner-blink] Rename windowProperties and PlannerResolvedFieldReference to avoid name conflit --- .../flink/table/calcite/FlinkRelBuilder.scala | 4 ++-- .../codegen/agg/AggsHandlerCodeGenerator.scala | 14 ++-- .../agg/batch/HashWindowCodeGenerator.scala| 4 ++-- .../agg/batch/SortWindowCodeGenerator.scala| 4 ++-- .../codegen/agg/batch/WindowCodeGenerator.scala| 4 ++-- ...nce.scala => ExestingFieldFieldReference.scala} | 2 +- ...perties.scala => plannerWindowProperties.scala} | 24 - .../flink/table/plan/logical/groupWindows.scala| 8 +++ .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 4 ++-- .../plan/metadata/FlinkRelMdUniqueGroups.scala | 4 ++-- .../table/plan/metadata/FlinkRelMdUniqueKeys.scala | 4 ++-- .../nodes/calcite/LogicalWindowAggregate.scala | 8 +++ .../table/plan/nodes/calcite/WindowAggregate.scala | 6 +++--- .../logical/FlinkLogicalWindowAggregate.scala | 4 ++-- .../batch/BatchExecHashWindowAggregate.scala | 4 ++-- .../batch/BatchExecHashWindowAggregateBase.scala | 4 ++-- .../batch/BatchExecLocalHashWindowAggregate.scala | 4 ++-- .../batch/BatchExecLocalSortWindowAggregate.scala | 4 ++-- .../batch/BatchExecSortWindowAggregate.scala | 4 ++-- .../batch/BatchExecSortWindowAggregateBase.scala | 4 ++-- .../batch/BatchExecWindowAggregateBase.scala | 6 +++--- .../stream/StreamExecGroupWindowAggregate.scala| 6 +++--- .../logical/LogicalWindowAggregateRuleBase.scala | 8 +++ .../plan/rules/logical/WindowPropertiesRule.scala | 20 ++--- .../flink/table/plan/util/AggregateUtil.scala | 20 - .../flink/table/plan/util/FlinkRelMdUtil.scala | 4 ++-- .../flink/table/plan/util/RelExplainUtil.scala | 4 ++-- .../flink/table/sources/TableSourceUtil.scala | 25 +++--- .../table/sources/tsextractors/ExistingField.scala | 4 ++-- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 16 +++--- 30 files changed, 120 insertions(+), 111 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala index 4683424..8b5cf8a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.calcite import org.apache.flink.table.calcite.FlinkRelFactories.{ExpandFactory, RankFactory, SinkFactory} -import org.apache.flink.table.expressions.WindowProperty +import org.apache.flink.table.expressions.PlannerWindowProperty import org.apache.flink.table.operations.QueryOperation import org.apache.flink.table.plan.QueryOperationConverter import org.apache.flink.table.runtime.rank.{RankRange, RankType} @@ -111,7 +111,7 @@ object FlinkRelBuilder { * * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]]. */ - case class NamedWindowProperty(name: String, property: WindowProperty) + case class PlannerNamedWindowProperty(name: String, property: PlannerWindowProperty) def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() { def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder = diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala index f77810b..bf40b04 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala @@ -58,7 +58,7 @@ class AggsHandlerCodeGenerator( /** window properties like window_start and window_end, only used in window aggregates */ private var namespaceClassName: String = _ - private var windowProperties: Seq[WindowProperty] = Seq() + private var windowProperties: Seq[PlannerWindowProperty] = Seq() private var hasNamespace: Boolean = false /** Aggregates informations */ @@ -182,7 +182,7 @@ class AggsHandlerCodeGenerator( * Adds window properties such as window_start, window_end */ private def initialWindowProperties( - windowProperties: Seq[WindowProperty], + windowProperties: Seq[PlannerWin
[flink] branch master updated: [hotfix][docs] remove duplicate `to` in state doc
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d81ac48 [hotfix][docs] remove duplicate `to` in state doc d81ac48 is described below commit d81ac4899a1072ef38103572c6f7e459c94fa895 Author: sunjincheng121 AuthorDate: Wed Jul 3 13:55:23 2019 +0800 [hotfix][docs] remove duplicate `to` in state doc --- docs/ops/state/large_state_tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ops/state/large_state_tuning.md b/docs/ops/state/large_state_tuning.md index a98bb4a..e90880b 100644 --- a/docs/ops/state/large_state_tuning.md +++ b/docs/ops/state/large_state_tuning.md @@ -108,7 +108,7 @@ impact. For state to be snapshotted asynchronsously, you need to use a state backend which supports asynchronous snapshotting. Starting from Flink 1.3, both RocksDB-based as well as heap-based state backends (`filesystem`) support asynchronous -snapshotting and use it by default. This applies to to both managed operator state as well as managed keyed state (incl. timers state). +snapshotting and use it by default. This applies to both managed operator state as well as managed keyed state (incl. timers state). Note *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state. Other state like keyed state is still snapshotted asynchronously. Please note that this is not a regression from previous versions and will be resolved with `FLINK-10026`.*
[flink-web] branch asf-site updated: Rebuild website
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 947ce88 Rebuild website 947ce88 is described below commit 947ce88b7811825774d6681b535f483fabc14cfd Author: sunjincheng121 AuthorDate: Wed Jul 3 10:17:39 2019 +0800 Rebuild website --- .jekyll-metadata | Bin 84463 -> 92656 bytes content/zh/flink-architecture.html | 49 ++--- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/.jekyll-metadata b/.jekyll-metadata index bba9081..74b3891 100644 Binary files a/.jekyll-metadata and b/.jekyll-metadata differ diff --git a/content/zh/flink-architecture.html b/content/zh/flink-architecture.html index c39218e..1739ba3 100644 --- a/content/zh/flink-architecture.html +++ b/content/zh/flink-architecture.html @@ -182,9 +182,9 @@ -Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale. +Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 -Here, we explain important aspects of Flink’s architecture. +接下来,我们来介绍一下 Flink 架构中的重要方面。 -Process Unbounded and Bounded Data +处理无界和有界数据 -Any kind of data is produced as a stream of events. Credit card transactions, sensor measurements, machine logs, or user interactions on a website or mobile application, all of these data are generated as a stream. +任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。 -Data can be processed as unbounded or bounded streams. +数据可以被作为 无界 或者 有界 流来处理。 -Unbounded streams have a start but no defined end. They do not terminate and provide data as it is generated. Unbounded streams must be continuously processed, i.e., events must be promptly handled after they have been ingested. It is not possible to wait for all input data to arrive because the input is unbounded and will not be complete at any point in time. Processing unbounded data often requires that events are ingested in a specific order, such as the order [...] +无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。 -Bounded streams have a defined start and end. Bounded streams can be processed by ingesting all data before performing any computations. Ordered ingestion is not required to process bounded streams because a bounded data set can always be sorted. Processing of bounded streams is also known as batch processing. +有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理 @@ -211,26 +211,26 @@ -Apache Flink excels at processing unbounded and bounded data sets. Precise control of time and state enable Flink’s runtime to run any kind of application on unbounded streams. Bounded streams are internally processed by algorithms and data structures that are specifically designed for fixed sized data sets, yielding excellent performance. +Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。 -Convince yourself by exploring the use cases that have been built on top of Flink. +通过探索 Flink 之上构建的 用例 来加深理解。 -Deploy Applications Anywhere +部署应用到任意地方 -Apache Flink is a distributed system and requires compute resources in order to execute applications. Flink integrates with all common cluster resource managers such as https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html";>Hadoop YARN, https://mesos.apache.org";>Apache Mesos, and https://kubernetes.io/";>Kubernetes but can also be setup to run as a stand-alone cluster. +Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html";>Hadoop YARN、 https://mesos.apache.org";>Apache Mesos 和 https://kubernetes.io/";>Kubernetes,但同时也可以作为独立集群运行。 -Flink is designed to work well each of the previously listed resource managers. This is achieved by resource-manager-specific deployment modes that allow Flink to interact with each resource manager in its idiomatic way. +Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。 -When deploying a Flink application, Flink automatically identifies the required resources based on the application’s configured parallelism and requests them from the resource manager. In case of a failure, Flink replaces the failed container by requesting new resources. All communication to submit or control an application happens via REST calls. This eases the integration of Flink in many environmen
[flink-web] branch asf-site updated: [FLINK-11561][docs-zh] Translate "Flink Architecture" page into Chinese
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 5a2e8a1 [FLINK-11561][docs-zh] Translate "Flink Architecture" page into Chinese 5a2e8a1 is described below commit 5a2e8a1dae1c0567dcdef94ed32fe94b87441845 Author: tom_gong AuthorDate: Fri May 10 19:02:13 2019 +0800 [FLINK-11561][docs-zh] Translate "Flink Architecture" page into Chinese This close #214 --- flink-architecture.zh.md | 49 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/flink-architecture.zh.md b/flink-architecture.zh.md index 580ab30..e3935e0 100644 --- a/flink-architecture.zh.md +++ b/flink-architecture.zh.md @@ -16,9 +16,9 @@ title: "Apache Flink 是什么?" -Apache Flink is a framework and distributed processing engine for stateful computations over *unbounded and bounded* data streams. Flink has been designed to run in *all common cluster environments*, perform computations at *in-memory speed* and at *any scale*. +Apache Flink 是一个框架和分布式处理引擎,用于在*无边界和有边界*数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 -Here, we explain important aspects of Flink's architecture. +接下来,我们来介绍一下 Flink 架构中的重要方面。 -## Process Unbounded and Bounded Data +## 处理无界和有界数据 -Any kind of data is produced as a stream of events. Credit card transactions, sensor measurements, machine logs, or user interactions on a website or mobile application, all of these data are generated as a stream. +任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。 -Data can be processed as *unbounded* or *bounded* streams. +数据可以被作为 *无界* 或者 *有界* 流来处理。 -1. **Unbounded streams** have a start but no defined end. They do not terminate and provide data as it is generated. Unbounded streams must be continuously processed, i.e., events must be promptly handled after they have been ingested. It is not possible to wait for all input data to arrive because the input is unbounded and will not be complete at any point in time. Processing unbounded data often requires that events are ingested in a specific order, such as the order in which events o [...] +1. **无界流** 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。 -2. **Bounded streams** have a defined start and end. Bounded streams can be processed by ingesting all data before performing any computations. Ordered ingestion is not required to process bounded streams because a bounded data set can always be sorted. Processing of bounded streams is also known as batch processing. +2. **有界流** 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理 -**Apache Flink excels at processing unbounded and bounded data sets.** Precise control of time and state enable Flink's runtime to run any kind of application on unbounded streams. Bounded streams are internally processed by algorithms and data structures that are specifically designed for fixed sized data sets, yielding excellent performance. +**Apache Flink 擅长处理无界和有界数据集** 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。 -Convince yourself by exploring the [use cases]({{ site.baseurl }}/usecases.html) that have been built on top of Flink. +通过探索 Flink 之上构建的 [用例]({{ site.baseurl }}/zh/usecases.html) 来加深理解。 -## Deploy Applications Anywhere +## 部署应用到任意地方 -Apache Flink is a distributed system and requires compute resources in order to execute applications. Flink integrates with all common cluster resource managers such as [Hadoop YARN](https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html), [Apache Mesos](https://mesos.apache.org), and [Kubernetes](https://kubernetes.io/) but can also be setup to run as a stand-alone cluster. +Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 [Hadoop YARN](https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html)、 [Apache Mesos](https://mesos.apache.org) 和 [Kubernetes](https://kubernetes.io/),但同时也可以作为独立集群运行。 -Flink is designed to work well each of the previously listed resource managers. This is achieved by resource-manager-specific deployment modes that allow Flink to interact with each resource manager in its idiomatic way. +Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。 -When deploying a Flink application, Flink automatically identifies the required resources based on the application's configured parallelism and requests them from the resource manager. In case of a failure, Flink replaces the failed container by requesting new resources. All communication to submit or control an application happens via REST calls. This eases the integration
[flink] 03/05: [FLINK-12946][docs-zh] Translate "Apache NiFi Connector" page into Chinese
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5caed717570e14e0d45c49eccf409c3170034af1 Author: aloys AuthorDate: Sun Jun 23 00:54:09 2019 +0800 [FLINK-12946][docs-zh] Translate "Apache NiFi Connector" page into Chinese This closes #8838 --- docs/dev/connectors/nifi.zh.md | 43 +- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/docs/dev/connectors/nifi.zh.md b/docs/dev/connectors/nifi.zh.md index 97fd831..114092f 100644 --- a/docs/dev/connectors/nifi.zh.md +++ b/docs/dev/connectors/nifi.zh.md @@ -1,5 +1,5 @@ --- -title: "Apache NiFi Connector" +title: "Apache NiFi 连接器" nav-title: NiFi nav-parent_id: connectors nav-pos: 7 @@ -23,9 +23,8 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Source and Sink that can read from and write to -[Apache NiFi](https://nifi.apache.org/). To use this connector, add the -following dependency to your project: +[Apache NiFi](https://nifi.apache.org/) 连接器提供了可以读取和写入的 Source 和 Sink。 +使用这个连接器,需要在工程中添加下面的依赖: {% highlight xml %} @@ -35,30 +34,23 @@ following dependency to your project: {% endhighlight %} -Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/projectsetup/dependencies.html) -for information about how to package the program with the libraries for -cluster execution. +注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 [这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。 - Installing Apache NiFi + 安装 Apache NiFi -Instructions for setting up a Apache NiFi cluster can be found -[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi). +安装 Apache NiFi 集群请参考 [这里](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi)。 Apache NiFi Source -The connector provides a Source for reading data from Apache NiFi to Apache Flink. +该连接器提供了一个 Source 可以用来从 Apache NiFi 读取数据到 Apache Flink。 -The class `NiFiSource(…)` provides 2 constructors for reading data from NiFi. +`NiFiSource(…)` 类有两个构造方法。 -- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(…)` given the client's SiteToSiteConfig and a - default wait time of 1000 ms. +- `NiFiSource(SiteToSiteConfig config)` - 构造一个 `NiFiSource(…)` ,需要指定参数 SiteToSiteConfig ,采用默认的等待时间 1000 ms。 -- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a `NiFiSource(…)` given the client's - SiteToSiteConfig and the specified wait time (in milliseconds). +- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - 构造一个 `NiFiSource(…)`,需要指定参数 SiteToSiteConfig 和等待时间(单位为毫秒)。 -Example: +示例: @@ -89,18 +81,17 @@ val nifiSource = new NiFiSource(clientConfig) -Here data is read from the Apache NiFi Output Port called "Data for Flink" which is part of Apache NiFi -Site-to-site protocol configuration. +数据从 Apache NiFi Output Port 读取,Apache NiFi Output Port 也被称为 "Data for Flink",是 Apache NiFi Site-to-site 协议配置的一部分。 Apache NiFi Sink -The connector provides a Sink for writing data from Apache Flink to Apache NiFi. +该连接器提供了一个 Sink 可以用来把 Apache Flink 的数据写入到 Apache NiFi。 -The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`. +`NiFiSink(…)` 类只有一个构造方法。 -- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder)` constructs a `NiFiSink(…)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi. +- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder)` 构造一个 `NiFiSink(…)`,需要指定 `SiteToSiteConfig` 和 `NiFiDataPacketBuilder` 参数 ,`NiFiDataPacketBuilder` 可以将Flink数据转化成可以被NiFi识别的 `NiFiDataPacket`. -Example: +示例: @@ -135,6 +126,6 @@ streamExecEnv.addSink(nifiSink) -More information about [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol can be found [here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site) +更多关于 [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol 的信息请参考 [这里](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)。 {% top %}
[flink] 04/05: [FLINK-12943][docs-zh] Translate "HDFS Connector" page into Chinese
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 345ac8868b705b7c9be9bb70e6fb54d1d15baa9b Author: aloys AuthorDate: Wed Jun 26 01:22:22 2019 +0800 [FLINK-12943][docs-zh] Translate "HDFS Connector" page into Chinese This closes #8897 --- docs/dev/connectors/filesystem_sink.zh.md | 80 --- 1 file changed, 31 insertions(+), 49 deletions(-) diff --git a/docs/dev/connectors/filesystem_sink.zh.md b/docs/dev/connectors/filesystem_sink.zh.md index f9a828d..54b0c64 100644 --- a/docs/dev/connectors/filesystem_sink.zh.md +++ b/docs/dev/connectors/filesystem_sink.zh.md @@ -1,5 +1,5 @@ --- -title: "HDFS Connector" +title: "HDFS 连接器" nav-title: Rolling File Sink nav-parent_id: connectors nav-pos: 5 @@ -23,9 +23,8 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Sink that writes partitioned files to any filesystem supported by -[Hadoop FileSystem](http://hadoop.apache.org). To use this connector, add the -following dependency to your project: +这个连接器可以向所有 [Hadoop FileSystem](http://hadoop.apache.org) 支持的文件系统写入分区文件。 +使用前,需要在工程里添加下面的依赖: {% highlight xml %} @@ -35,16 +34,11 @@ following dependency to your project: {% endhighlight %} -Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{site.baseurl}}/dev/projectsetup/dependencies.html) -for information about how to package the program with the libraries for -cluster execution. +注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 [这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。 - Bucketing File Sink + 分桶文件 Sink -The bucketing behaviour as well as the writing can be configured but we will get to that later. -This is how you can create a bucketing sink which by default, sinks to rolling files that are split by time: +关于分桶的配置我们后面会有讲述,这里先创建一个分桶 sink,默认情况下这个 sink 会将数据写入到按照时间切分的滚动文件中: @@ -65,40 +59,30 @@ input.addSink(new BucketingSink[String]("/base/path")) -The only required parameter is the base path where the buckets will be -stored. The sink can be further configured by specifying a custom bucketer, writer and batch size. - -By default the bucketing sink will split by the current system time when elements arrive and will -use the datetime pattern `"-MM-dd--HH"` to name the buckets. This pattern is passed to -`DateTimeFormatter` with the current system time and JVM's default timezone to form a bucket path. -Users can also specify a timezone for the bucketer to format bucket path. A new bucket will be created -whenever a new date is encountered. For example, if you have a pattern that contains minutes as the -finest granularity you will get a new bucket every minute. Each bucket is itself a directory that -contains several part files: each parallel instance of the sink will create its own part file and -when part files get too big the sink will also create a new part file next to the others. When a -bucket becomes inactive, the open part file will be flushed and closed. A bucket is regarded as -inactive when it hasn't been written to recently. By default, the sink checks for inactive buckets -every minute, and closes any buckets which haven't been written to for over a minute. This -behaviour can be configured with `setInactiveBucketCheckInterval()` and -`setInactiveBucketThreshold()` on a `BucketingSink`. - -You can also specify a custom bucketer by using `setBucketer()` on a `BucketingSink`. If desired, -the bucketer can use a property of the element or tuple to determine the bucket directory. - -The default writer is `StringWriter`. This will call `toString()` on the incoming elements -and write them to part files, separated by newline. To specify a custom writer use `setWriter()` -on a `BucketingSink`. If you want to write Hadoop SequenceFiles you can use the provided -`SequenceFileWriter` which can also be configured to use compression. - -There are two configuration options that specify when a part file should be closed -and a new one started: +初始化时只需要一个参数,这个参数表示分桶文件存储的路径。分桶 sink 可以通过指定自定义的 bucketer、 writer 和 batch 值进一步配置。 + +默认情况下,当数据到来时,分桶 sink 会按照系统时间对数据进行切分,并以 `"-MM-dd--HH"` 的时间格式给每个桶命名。然后 +`DateTimeFormatter` 按照这个时间格式将当前系统时间以 JVM 默认时区转换成分桶的路径。用户可以自定义时区来生成 +分桶的路径。每遇到一个新的日期都会产生一个新的桶。例如,如果时间的格式以分钟为粒度,那么每分钟都会产生一个桶。每个桶都是一个目录, +目录下包含了几个部分文件(part files):每个 sink 的并发实例都会创建一个属于自己的部分文件,当这些文件太大的时候,sink 会产生新的部分文件。 +当一个桶不再活跃时,打开的部分文件会刷盘并且关闭。如果一个桶最近一段时间都没有写入,那么这个桶被认为是不活跃的。sink 默认会每分钟 +检查不活跃的桶、关闭那些超过一分钟没有写入的桶。这些行为可以通过 `BucketingSink` 的 `setInactiveBucketCheckInterval()` +和 `setInactiveBucketThreshold()` 进行设置。 + +可以调用`BucketingSink` 的 `setBucketer()` 方法指定自定义的 bucketer,如果需要的话,也可以使用一个元素或者元组属性来决定桶的路径。 + +默认的 writer 是 `StringWriter`。数据到达时,通过 `toString()` 方法得到内容,内容以换行符分隔,`StringWriter` 将数据 +内容写入部分文件。可以通过 `BucketingSink` 的 `setWriter()`
[flink] 01/05: [FLINK-12945][docs-zh] Translate "RabbitMQ Connector" page into Chinese
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6bf207166a5ca09c259d854989ccff18687745bc Author: aloys AuthorDate: Sun Jun 23 20:19:26 2019 +0800 [FLINK-12945][docs-zh] Translate "RabbitMQ Connector" page into Chinese This closes #8843 --- docs/dev/connectors/rabbitmq.md| 10 +++--- docs/dev/connectors/rabbitmq.zh.md | 73 ++ 2 files changed, 32 insertions(+), 51 deletions(-) diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md index 838db2a..a3a56a8 100644 --- a/docs/dev/connectors/rabbitmq.md +++ b/docs/dev/connectors/rabbitmq.md @@ -23,7 +23,7 @@ specific language governing permissions and limitations under the License. --> -# License of the RabbitMQ Connector +## License of the RabbitMQ Connector Flink's RabbitMQ connector defines a Maven dependency on the "RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). @@ -35,7 +35,7 @@ Users that create and publish derivative work based on Flink's RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). -# RabbitMQ Connector +## RabbitMQ Connector This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/). To use this connector, add the following dependency to your project: @@ -49,10 +49,10 @@ This connector provides access to data streams from [RabbitMQ](http://www.rabbit Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/dev/projectsetup/dependencies.html). - Installing RabbitMQ +### Installing RabbitMQ Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html). After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched. - RabbitMQ Source +### RabbitMQ Source This connector provides a `RMQSource` class to consume messages from a RabbitMQ queue. This source provides three different levels of guarantees, depending @@ -131,7 +131,7 @@ val stream = env - RabbitMQ Sink +### RabbitMQ Sink This connector provides a `RMQSink` class for sending messages to a RabbitMQ queue. Below is a code example for setting up a RabbitMQ sink. diff --git a/docs/dev/connectors/rabbitmq.zh.md b/docs/dev/connectors/rabbitmq.zh.md index 838db2a..e213d3f 100644 --- a/docs/dev/connectors/rabbitmq.zh.md +++ b/docs/dev/connectors/rabbitmq.zh.md @@ -1,5 +1,5 @@ --- -title: "RabbitMQ Connector" +title: "RabbitMQ 连接器" nav-title: RabbitMQ nav-parent_id: connectors nav-pos: 6 @@ -23,21 +23,17 @@ specific language governing permissions and limitations under the License. --> -# License of the RabbitMQ Connector +## RabbitMQ 连接器的许可证 -Flink's RabbitMQ connector defines a Maven dependency on the -"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). +Flink 的 RabbitMQ 连接器依赖了 "RabbitMQ AMQP Java Client",它基于三种协议下发行:Mozilla Public License 1.1 ("MPL")、GNU General Public License version 2 ("GPL") 和 Apache License version 2 ("ASL")。 -Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client" -nor packages binaries from the "RabbitMQ AMQP Java Client". +Flink 自身既没有复用 "RabbitMQ AMQP Java Client" 的代码,也没有将 "RabbitMQ AMQP Java Client" 打二进制包。 -Users that create and publish derivative work based on Flink's -RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") -must be aware that this may be subject to conditions declared in the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). +如果用户发布的内容是基于 Flink 的 RabbitMQ 连接器的(进而重新发布了 "RabbitMQ AMQP Java Client" ),那么一定要注意这可能会受到 Mozilla Public License 1.1 ("MPL")、GNU General Public License version 2 ("GPL")、Apache License version 2 ("ASL") 协议的限制. -# RabbitMQ Connector +## RabbitMQ 连接器 -This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/). To use this connector, add the following dependency to your project: +这个连接器可以访问 [RabbitMQ](http://www.rabbitmq.com/) 的数据流。使用这个连接器,需要在工程里添加下面的依赖: {% highlight xml %} @@ -47,44 +43,30 @@ This connector provides access to data streams from [RabbitMQ](http://www.rabbit {% endhighlight %} -Note that the streaming connectors are currently not part of the binary distribution. See linking with them fo
[flink] 05/05: [FLINK-12944][docs-zh] Translate "Streaming File Sink" page into Chinese
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 29a0e8e5e7d69b0450ac3865c25df0e5d75d758b Author: aloys AuthorDate: Fri Jun 28 00:39:56 2019 +0800 [FLINK-12944][docs-zh] Translate "Streaming File Sink" page into Chinese This closes #8918 --- docs/dev/connectors/streamfile_sink.zh.md | 93 --- 1 file changed, 35 insertions(+), 58 deletions(-) diff --git a/docs/dev/connectors/streamfile_sink.zh.md b/docs/dev/connectors/streamfile_sink.zh.md index 40b104b..79586ad 100644 --- a/docs/dev/connectors/streamfile_sink.zh.md +++ b/docs/dev/connectors/streamfile_sink.zh.md @@ -23,30 +23,22 @@ specific language governing permissions and limitations under the License. --> -This connector provides a Sink that writes partitioned files to filesystems -supported by the [Flink `FileSystem` abstraction]({{ site.baseurl}}/ops/filesystems/index.html). +这个连接器提供了一个 Sink 来将分区文件写入到支持 [Flink `FileSystem`]({{ site.baseurl}}/zh/ops/filesystems/index.html) 接口的文件系统中。 -Since in streaming the input is potentially infinite, the streaming file sink writes data -into buckets. The bucketing behaviour is configurable but a useful default is time-based -bucketing where we start writing a new bucket every hour and thus get -individual files that each contain a part of the infinite output stream. +由于在流处理中输入可能是无限的,所以流处理的文件 sink 会将数据写入到桶中。如何分桶是可以配置的,一种有效的默认 +策略是基于时间的分桶,这种策略每个小时写入一个新的桶,这些桶各包含了无限输出流的一部分数据。 -Within a bucket, we further split the output into smaller part files based on a -rolling policy. This is useful to prevent individual bucket files from getting -too big. This is also configurable but the default policy rolls files based on -file size and a timeout, *i.e* if no new data was written to a part file. +在一个桶内部,会进一步将输出基于滚动策略切分成更小的文件。这有助于防止桶文件变得过大。滚动策略也是可以配置的,默认 +策略会根据文件大小和超时时间来滚动文件,超时时间是指没有新数据写入部分文件(part file)的时间。 -The `StreamingFileSink` supports both row-wise encoding formats and -bulk-encoding formats, such as [Apache Parquet](http://parquet.apache.org). +`StreamingFileSink` 支持行编码格式和批量编码格式,比如 [Apache Parquet](http://parquet.apache.org)。 - Using Row-encoded Output Formats + 使用行编码输出格式 -The only required configuration are the base path where we want to output our -data and an -[Encoder]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/serialization/Encoder.html) -that is used for serializing records to the `OutputStream` for each file. +只需要配置一个输出路径和一个 [Encoder]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/serialization/Encoder.html)。 +Encoder负责为每个文件的 `OutputStream` 序列化数据。 -Basic usage thus looks like this: +基本用法如下: @@ -84,55 +76,40 @@ input.addSink(sink) -This will create a streaming sink that creates hourly buckets and uses a -default rolling policy. The default bucket assigner is +上面的代码创建了一个按小时分桶、按默认策略滚动的 sink。默认分桶器是 [DateTimeBucketAssigner]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html) -and the default rolling policy is -[DefaultRollingPolicy]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html). -You can specify a custom +,默认滚动策略是 +[DefaultRollingPolicy]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html)。 +可以为 sink builder 自定义 [BucketAssigner]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html) -and -[RollingPolicy]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html) -on the sink builder. Please check out the JavaDoc for -[StreamingFileSink]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html) -for more configuration options and more documentation about the workings and -interactions of bucket assigners and rolling policies. +和 +[RollingPolicy]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)。 +更多配置操作以及分桶器和滚动策略的工作机制和相互影响请参考: +[StreamingFileSink]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html)。 - Using Bulk-encoded Output Formats + 使用批量编码输出格式 -In the above example we used an `Encoder` that can encode or serialize each -record individually. The streaming file sink also supports bulk-encoded output -formats such as [Apache Parquet](http://parquet.apache.org). To use these, -instead of `StreamingFileSink.forRowFormat()` you would use -`StreamingFileSink.forBulkFormat()` and specify a `BulkWriter.Factory`. +上面的示例使用 `Encoder` 分别序列化每一个记录。除此之外,流式文件 sink 还支持批量编码的输出格式,比如 [Apache Parquet](http://pa
[flink] 02/05: [FLINK-12938][docs-zh] Translate "Streaming Connectors" page into Chinese
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit bda485fc077453ff0a555c3f25e702bfd0a1f339 Author: aloys AuthorDate: Thu Jun 27 16:17:40 2019 +0800 [FLINK-12938][docs-zh] Translate "Streaming Connectors" page into Chinese This closes #8837 --- docs/dev/connectors/index.zh.md | 49 + 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/docs/dev/connectors/index.zh.md b/docs/dev/connectors/index.zh.md index b5405d4..aabcdbd 100644 --- a/docs/dev/connectors/index.zh.md +++ b/docs/dev/connectors/index.zh.md @@ -1,5 +1,5 @@ --- -title: "Streaming Connectors" +title: "流式连接器" nav-id: connectors nav-title: Connectors nav-parent_id: streaming @@ -28,16 +28,15 @@ under the License. * toc {:toc} -## Predefined Sources and Sinks +## 预定义的 Source 和 Sink -A few basic data sources and sinks are built into Flink and are always available. -The [predefined data sources]({{ site.baseurl }}/dev/datastream_api.html#data-sources) include reading from files, directories, and sockets, and -ingesting data from collections and iterators. -The [predefined data sinks]({{ site.baseurl }}/dev/datastream_api.html#data-sinks) support writing to files, to stdout and stderr, and to sockets. +一些比较基本的 Source 和 Sink 已经内置在 Flink 里。 +[预定义 data sources]({{ site.baseurl }}/zh/dev/datastream_api.html#data-sources) 支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。 +[预定义 data sinks]({{ site.baseurl }}/zh/dev/datastream_api.html#data-sinks) 支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。 -## Bundled Connectors +## 附带的连接器 -Connectors provide code for interfacing with various third-party systems. Currently these systems are supported: +连接器可以和多种多样的第三方系统进行交互。目前支持以下系统: * [Apache Kafka](kafka.html) (source/sink) * [Apache Cassandra](cassandra.html) (sink) @@ -48,15 +47,13 @@ Connectors provide code for interfacing with various third-party systems. Curren * [Apache NiFi](nifi.html) (source/sink) * [Twitter Streaming API](twitter.html) (source) -Keep in mind that to use one of these connectors in an application, additional third party -components are usually required, e.g. servers for the data stores or message queues. -Note also that while the streaming connectors listed in this section are part of the -Flink project and are included in source releases, they are not included in the binary distributions. -Further instructions can be found in the corresponding subsections. +请记住,在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列。 +要注意这些列举的连接器是 Flink 工程的一部分,包含在发布的源码中,但是不包含在二进制发行版中。 +更多说明可以参考对应的子部分。 -## Connectors in Apache Bahir +## Apache Bahir 中的连接器 -Additional streaming connectors for Flink are being released through [Apache Bahir](https://bahir.apache.org/), including: +Flink 还有些一些额外的连接器通过 [Apache Bahir](https://bahir.apache.org/) 发布, 包括: * [Apache ActiveMQ](https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/) (source/sink) * [Apache Flume](https://bahir.apache.org/docs/flink/current/flink-streaming-flume/) (sink) @@ -64,23 +61,17 @@ Additional streaming connectors for Flink are being released through [Apache Bah * [Akka](https://bahir.apache.org/docs/flink/current/flink-streaming-akka/) (sink) * [Netty](https://bahir.apache.org/docs/flink/current/flink-streaming-netty/) (source) -## Other Ways to Connect to Flink +## 连接Fink的其他方法 -### Data Enrichment via Async I/O +### 异步 I/O -Using a connector isn't the only way to get data in and out of Flink. -One common pattern is to query an external database or web service in a `Map` or `FlatMap` -in order to enrich the primary datastream. -Flink offers an API for [Asynchronous I/O]({{ site.baseurl }}/dev/stream/operators/asyncio.html) -to make it easier to do this kind of enrichment efficiently and robustly. +使用connector并不是唯一可以使数据进入或者流出Flink的方式。 +一种常见的模式是从外部数据库或者 Web 服务查询数据得到初始数据流,然后通过 `Map` 或者 `FlatMap` 对初始数据流进行丰富和增强。 +Flink 提供了[异步 I/O]({{ site.baseurl }}/zh/dev/stream/operators/asyncio.html) API 来让这个过程更加简单、高效和稳定。 -### Queryable State +### 可查询状态 -When a Flink application pushes a lot of data to an external data store, this -can become an I/O bottleneck. -If the data involved has many fewer reads than writes, a better approach can be -for an external application to pull from Flink the data it needs. -The [Queryable State]({{ site.baseurl }}/dev/stream/state/queryable_state.html) interface -enables this by allowing the state being managed by Flink to be queried on demand. +当 Flink 应用程序需要向外部存储推送大量数据时会导致 I/O 瓶颈问题出现。在这种场景下,如果对数据的读操作远少于写操作,那么让外部应用从 Flink 拉取所需的数据会是一种更好的方式。 +[可查询状态]({{ site.baseurl }}/zh/dev/stream/state/queryable_state.html) 接口可以实现这个功能,该接口允许被 Flink 托管的状态可以被按需查询。 {% top %}
[flink] branch master updated (686bc84 -> 29a0e8e)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 686bc84 [hotfix][python] Remove the redundant 'python setup.py install' from tox.ini new 6bf2071 [FLINK-12945][docs-zh] Translate "RabbitMQ Connector" page into Chinese new bda485f [FLINK-12938][docs-zh] Translate "Streaming Connectors" page into Chinese new 5caed71 [FLINK-12946][docs-zh] Translate "Apache NiFi Connector" page into Chinese new 345ac88 [FLINK-12943][docs-zh] Translate "HDFS Connector" page into Chinese new 29a0e8e [FLINK-12944][docs-zh] Translate "Streaming File Sink" page into Chinese The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/dev/connectors/filesystem_sink.zh.md | 80 +++--- docs/dev/connectors/index.zh.md | 49 +++- docs/dev/connectors/nifi.zh.md| 43 ++ docs/dev/connectors/rabbitmq.md | 10 ++-- docs/dev/connectors/rabbitmq.zh.md| 73 +--- docs/dev/connectors/streamfile_sink.zh.md | 93 --- 6 files changed, 135 insertions(+), 213 deletions(-)
[flink-web] branch asf-site updated: Rebuild website
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new b28e44e Rebuild website b28e44e is described below commit b28e44efb01ee70d24899f406884d9f507c844e4 Author: sunjincheng121 AuthorDate: Wed Jul 3 09:30:26 2019 +0800 Rebuild website --- .jekyll-metadata | Bin 0 -> 84463 bytes content/blog/feed.xml| 147 +++ content/blog/index.html | 38 ++- content/blog/page2/index.html| 38 +-- content/blog/page3/index.html| 40 +-- content/blog/page4/index.html| 40 +-- content/blog/page5/index.html| 40 +-- content/blog/page6/index.html| 38 ++- content/blog/page7/index.html| 36 ++- content/blog/page8/index.html| 38 +-- content/blog/page9/index.html| 25 ++ content/contributing/contribute-code.html| 2 +- content/contributing/improve-website.html| 2 +- content/downloads.html | 28 +-- content/index.html | 8 +- content/news/2019/07/02/release-1.8.1.html | 355 +++ content/zh/contributing/improve-website.html | 2 +- content/zh/downloads.html| 32 +-- content/zh/index.html| 8 +- 19 files changed, 764 insertions(+), 153 deletions(-) diff --git a/.jekyll-metadata b/.jekyll-metadata new file mode 100644 index 000..bba9081 Binary files /dev/null and b/.jekyll-metadata differ diff --git a/content/blog/feed.xml b/content/blog/feed.xml index cfadfc2..1c51569 100644 --- a/content/blog/feed.xml +++ b/content/blog/feed.xml @@ -7,6 +7,153 @@ https://flink.apache.org/blog/feed.xml"; rel="self" type="application/rss+xml" /> +Apache Flink 1.8.1 Released +The Apache Flink community released the first bugfix version of the Apache Flink 1.8 series.
+ +This release includes more than 40 fixes and minor improvements for Flink 1.8.1. The list below includes a detailed list of all improvements, sub-tasks and bug fixes.
+ +We highly recommend all users to upgrade to Flink 1.8.1.
+ +Updated Maven dependencies:
+ ++ +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>1.8.1</version> +</dependency> +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>1.8.1</version> +</dependency> +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.11</artifactId> + <version>1.8.1</version> +</dependency>
You can find the binaries on the updated Downloads page.
+ +List of resolved issues:
+ +Sub-task +
++
+ +- [FLINK-10921;] - Prioritize shard consumers in Kinesis Consumer by event time +
+- [FLINK-12617;] - StandaloneJobClusterEntrypoint should default to random JobID for non-HA setups +
+Bug +
++
- [FLINK-9445;] - scala-shell uses plain java command +
+- [FLINK-10455;] - Potential Kafka producer leak in case of failures +
+- [FLINK-10941;] - Slots prematurely released which still contain unconsumed data +
+- [FLINK-11059;] - JobMaster may continue using an invalid slot if releasing idle slot meet a timeout +
+- [FLINK-11107;] - Avoid me
[flink-web] branch asf-site updated: Add Apache Flink release 1.8.1
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 70bcf43 Add Apache Flink release 1.8.1 70bcf43 is described below commit 70bcf439500ba2565a027f2c137101268bf62027 Author: sunjincheng121 AuthorDate: Mon Jun 17 16:04:54 2019 +0800 Add Apache Flink release 1.8.1 --- _config.yml| 54 ++--- _posts/2019-07-02-release-1.8.1.md | 152 + downloads.md | 2 +- downloads.zh.md| 2 +- 4 files changed, 181 insertions(+), 29 deletions(-) diff --git a/_config.yml b/_config.yml index a209cbd..11eea16 100644 --- a/_config.yml +++ b/_config.yml @@ -9,7 +9,7 @@ url: https://flink.apache.org DOCS_BASE_URL: https://ci.apache.org/projects/flink/ -FLINK_VERSION_STABLE: 1.8.0 +FLINK_VERSION_STABLE: 1.8.1 FLINK_VERSION_STABLE_SHORT: 1.8 FLINK_VERSION_LATEST: 1.9-SNAPSHOT @@ -55,23 +55,23 @@ flink_releases: - version_short: 1.8 binary_release: - name: "Apache Flink 1.8.0" + name: "Apache Flink 1.8.1" scala_211: - id: "180-download_211" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.11.tgz"; - asc_url: "https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.11.tgz.asc"; - sha512_url: "https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.11.tgz.sha512"; + id: "181-download_211" + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.11.tgz"; + asc_url: "https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.11.tgz.asc"; + sha512_url: "https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.11.tgz.sha512"; scala_212: - id: "180-download_212" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz"; - asc_url: "https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz.asc"; - sha512_url: "https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz.sha512"; + id: "181-download_212" + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.12.tgz"; + asc_url: "https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.12.tgz.asc"; + sha512_url: "https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-bin-scala_2.12.tgz.sha512"; source_release: - name: "Apache Flink 1.8.0" - id: "180-download-source" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.0/flink-1.8.0-src.tgz"; - asc_url: "https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-src.tgz.asc"; - sha512_url: "https://www.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-src.tgz.sha512"; + name: "Apache Flink 1.8.1" + id: "181-download-source" + url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.8.1/flink-1.8.1-src.tgz"; + asc_url: "https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-src.tgz.asc"; + sha512_url: "https://www.apache.org/dist/flink/flink-1.8.1/flink-1.8.1-src.tgz.sha512"; optional_components: - name: "Pre-bundled Hadoop 2.4.1" @@ -109,26 +109,26 @@ flink_releases: name: "Avro SQL Format" category: "SQL Formats" scala_dependent: false - id: 180-sql-format-avro - url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.0/flink-avro-1.8.0.jar - asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.0/flink-avro-1.8.0.jar.asc - sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.0/flink-avro-1.8.0.jar.sha1 + id: 181-sql-format-avro + url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.1/flink-avro-1.8.1.jar + asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.1/flink-avro-1.8.1.jar.asc + sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.8.1/flink-avro-1.8.1.jar.sha1 - name: "CSV SQL Format" category: "SQL Formats" scala_dependent: false - id: 180-sql-format-csv - url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.0/flink-csv-1.8.0.jar - asc_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.0/flink-csv-1.8.0.jar.asc - sha_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.8.0
[flink] branch master updated: [hotfix][python] Remove the redundant 'python setup.py install' from tox.ini
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 686bc84 [hotfix][python] Remove the redundant 'python setup.py install' from tox.ini 686bc84 is described below commit 686bc841398dd14f054df8bf97c6d9ef8d8d99d9 Author: Dian Fu AuthorDate: Fri Jun 28 20:57:04 2019 +0800 [hotfix][python] Remove the redundant 'python setup.py install' from tox.ini This close #8947 --- flink-python/tox.ini | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-python/tox.ini b/flink-python/tox.ini index c475adf..e0a6c29 100644 --- a/flink-python/tox.ini +++ b/flink-python/tox.ini @@ -30,7 +30,6 @@ deps = pytest commands = python --version -python setup.py install --force pytest bash ./dev/run_pip_test.sh
[flink] branch master updated: [FLINK-13048][hive] support decimal in Flink's integration with Hive user defined functions
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 64bb08d [FLINK-13048][hive] support decimal in Flink's integration with Hive user defined functions 64bb08d is described below commit 64bb08d776ea4d153a74bbe992b59e52c3044425 Author: bowen.li AuthorDate: Mon Jul 1 16:29:36 2019 -0700 [FLINK-13048][hive] support decimal in Flink's integration with Hive user defined functions This PR adds support for decimal in Flink's integration with Hive user defined functions. This closes #8941. --- .../functions/hive/conversion/HiveInspectors.java | 17 ++- .../table/functions/hive/HiveGenericUDAFTest.java | 21 +++ .../table/functions/hive/HiveGenericUDFTest.java | 24 +++--- .../table/functions/hive/HiveSimpleUDFTest.java| 13 4 files changed, 58 insertions(+), 17 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java index bbd0b55..0f9dd67 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; @@ -58,6 +59,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspect import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBinaryObjectInspector; @@ -94,6 +96,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import java.math.BigDecimal; import java.sql.Date; import java.sql.Timestamp; import java.util.ArrayList; @@ -165,7 +168,6 @@ public class HiveInspectors { case TIMESTAMP: return new JavaConstantTimestampObjectInspector((Timestamp) value); case DECIMAL: - // TODO: Needs more testing return new JavaConstantHiveDecimalObjectInspector((HiveDecimal) value); case BINARY: return new JavaConstantBinaryObjectInspector((byte[]) value); @@ -200,9 +202,9 @@ public class HiveInspectors { return o -> new HiveChar((String) o, ((CharType) dataType).getLength()); } else if (inspector instanceof HiveVarcharObjectInspector) { return o -> new HiveVarchar((String) o, ((VarCharType) dataType).getLength()); + } else if (inspector instanceof HiveDecimalObjectInspector) { + return o -> HiveDecimal.create((BigDecimal) o); } - - // TODO: handle decimal type } if (inspector instanceof ListObjectInspector) { @@ -299,9 +301,11 @@ public class HiveInspectors { HiveVarcharObjectInspector oi = (HiveVarcharObjectInspector) inspector; return oi.getPrimitiveJavaObject(data).getValue(); - } + } else if (inspector instanceof HiveDecimalObjectInspector) { + HiveDecimalObjectInspector oi = (HiveDecimalObjectInspector) inspector; - // TODO: handle decimal type + return oi.getPrimitiveJavaObject(data).bigDecimalValue(); + } } if (inspector instanceof ListObjectI
[flink] branch master updated: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be consistent with standard name in Hive
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 011006f [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be consistent with standard name in Hive 011006f is described below commit 011006f81c11e0d0c8bb37c20e06daad1dcb7148 Author: bowen.li AuthorDate: Mon Jul 1 12:42:41 2019 -0700 [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be consistent with standard name in Hive This PR renames the SQL CLI config key for HiveCatalog from hive-site-path to hive-conf-dir which is consistent with standard Hive conf key name. This closes #8939. --- docs/dev/table/sqlClient.md| 10 +- .../apache/flink/table/catalog/hive/HiveCatalog.java | 10 +- .../hive/descriptors/HiveCatalogDescriptor.java| 4 ++-- .../catalog/hive/descriptors/HiveCatalogValidator.java | 4 ++-- .../catalog/hive/factories/HiveCatalogFactory.java | 18 +- .../flink-sql-client/conf/sql-client-defaults.yaml | 2 +- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index 9297b2d..cd69cce 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -228,12 +228,12 @@ catalogs: - name: catalog_1 type: hive property-version: 1 - hive-site-path: file://... + hive-conf-dir: ... - name: catalog_2 type: hive property-version: 1 default-database: mydb2# optional: name of default database of this catalog - hive-site-path: file://... # optional: path of the hive-site.xml file. (Default value is created by HiveConf) + hive-conf-dir: ... # optional: path of Hive conf directory. (Default value is created by HiveConf) hive-version: 1.2.1# optional: version of Hive (2.3.4 by default) {% endhighlight %} @@ -245,7 +245,7 @@ This configuration: - specifies a parallelism of 1 for queries executed in this streaming environment, - specifies an event-time characteristic, and - runs queries in the `table` result mode. -- creates two `HiveCatalog` (type: hive) named with their own default databases and specified hive site path. Hive version of the first `HiveCatalog` is `2.3.4` by default and that of the second one is specified as `1.2.1`. +- creates two `HiveCatalog` (type: hive) named with their own default databases and specified Hive conf directory. Hive version of the first `HiveCatalog` is `2.3.4` by default and that of the second one is specified as `1.2.1`. - use `catalog_1` as the current catalog of the environment upon start, and `mydb1` as the current database of the catalog. Depending on the use case, a configuration can be split into multiple files. Therefore, environment files can be created for general purposes (*defaults environment file* using `--defaults`) as well as on a per-session basis (*session environment file* using `--environment`). Every CLI session is initialized with the default properties followed by the session properties. For example, the defaults environment file could specify all table sources that should be available for querying in ev [...] @@ -447,11 +447,11 @@ catalogs: property-version: 1 default-database: mydb2 hive-version: 1.2.1 - hive-site-path: + hive-conf-dir: - name: catalog_2 type: hive property-version: 1 - hive-site-path: + hive-conf-dir: {% endhighlight %} Currently Flink supports two types of catalog - `FlinkInMemoryCatalog` and `HiveCatalog`. diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 03ddceb..00e7e69 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -118,10 +118,10 @@ public class HiveCatalog extends AbstractCatalog { private HiveMetastoreClientWrapper client; - public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable URL hiveSiteUrl, String hiveVersion) { + public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable URL hiveConfDir, String hiveVersion) { this(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase, - createHiveConf(hiveSiteUrl), + createHiveConf(hiveConfDir), hiveVersion); } @@ -136,10 +136,10 @@ public class HiveCatalog extends AbstractCatalog {
[flink] branch master updated: [FLINK-13021][table][hive] unify catalog partition implementations
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 59ff00d [FLINK-13021][table][hive] unify catalog partition implementations 59ff00d is described below commit 59ff00d71d298fa61a92efa4fecd46f3cefc50f6 Author: bowen.li AuthorDate: Mon Jul 1 12:31:47 2019 -0700 [FLINK-13021][table][hive] unify catalog partition implementations This PR unifies catalog partition implementations. This closes #8926. --- .../flink/table/catalog/hive/HiveCatalog.java | 54 --- .../table/catalog/hive/HiveCatalogConfig.java | 17 ++ .../table/catalog/hive/HiveCatalogPartition.java | 61 -- .../table/catalog/hive/HivePartitionConfig.java| 17 ++ .../connectors/hive/HiveTableOutputFormatTest.java | 9 ++-- .../batch/connectors/hive/HiveTableSinkTest.java | 8 +-- .../catalog/hive/HiveCatalogHiveMetadataTest.java | 19 --- flink-python/pyflink/table/catalog.py | 26 + flink-python/pyflink/table/tests/test_catalog.py | 8 +-- ...logPartition.java => CatalogPartitionImpl.java} | 23 ++-- .../table/catalog/GenericCatalogPartition.java | 52 -- .../flink/table/catalog/CatalogTestBase.java | 5 ++ .../table/catalog/GenericInMemoryCatalogTest.java | 26 - .../flink/table/catalog/CatalogPartition.java | 7 +++ .../apache/flink/table/catalog/CatalogTest.java| 40 +++--- .../flink/table/catalog/CatalogTestUtil.java | 13 + 16 files changed, 111 insertions(+), 274 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 8659a80..03ddceb 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -29,15 +29,14 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogFunctionImpl; import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.CatalogViewImpl; -import org.apache.flink.table.catalog.GenericCatalogPartition; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.config.CatalogConfig; -import org.apache.flink.table.catalog.config.CatalogTableConfig; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -483,7 +482,7 @@ public class HiveCatalog extends AbstractCatalog { if (isGeneric) { properties = retrieveFlinkProperties(properties); } - String comment = properties.remove(CatalogTableConfig.TABLE_COMMENT); + String comment = properties.remove(HiveCatalogConfig.COMMENT); // Table schema TableSchema tableSchema = @@ -515,7 +514,7 @@ public class HiveCatalog extends AbstractCatalog { Map properties = new HashMap<>(table.getProperties()); // Table comment - properties.put(CatalogTableConfig.TABLE_COMMENT, table.getComment()); + properties.put(HiveCatalogConfig.COMMENT, table.getComment()); boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC)); @@ -623,8 +622,10 @@ public class HiveCatalog extends AbstractCatalog { checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); checkNotNull(partition, "Partition cannot be null"); - if (!(partition instanceof HiveCatalogPartition)) { - throw new CatalogException("Currently only supports HiveCatalogPartition"); + boolean isGeneric = Boolean.valueOf(partition.getProperties().get(CatalogConfig.IS_GENERIC)); + + if (isGeneric) { + throw new CatalogException("Currently only supports non-generic CatalogPartition"); } Table hiveTable = getHiveTable(tablePath); @@ -715,7 +716,14 @@ public class HiveCatalog extends AbstractCatalog { try {
[flink] branch master updated: [FLINK-13047][table] Fix the Optional.orElse() usage issue in DatabaseCalciteSchema
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3358cd3 [FLINK-13047][table] Fix the Optional.orElse() usage issue in DatabaseCalciteSchema 3358cd3 is described below commit 3358cd3b6d6ffebb313b6c02f2a53e6dbd6ec1ed Author: Xuefu Zhang AuthorDate: Mon Jul 1 15:25:44 2019 -0700 [FLINK-13047][table] Fix the Optional.orElse() usage issue in DatabaseCalciteSchema This PR fixes the Optional.orElse() usage issue in DatabaseCalciteSchem. This closes #8940. --- .../apache/flink/table/catalog/DatabaseCalciteSchema.java | 15 --- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java index e629978..95475ef 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java @@ -29,7 +29,6 @@ import org.apache.flink.table.plan.schema.TableSourceTable; import org.apache.flink.table.plan.stats.FlinkStatistic; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.sources.TableSource; -import org.apache.flink.types.Row; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.rel.type.RelProtoDataType; @@ -104,9 +103,19 @@ class DatabaseCalciteSchema implements Schema { } private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) { + TableSource tableSource; Optional tableFactory = catalog.getTableFactory(); - TableSource tableSource = tableFactory.map(tf -> ((TableSourceFactory) tf).createTableSource(tablePath, table)) - .orElse(TableFactoryUtil.findAndCreateTableSource(table)); + if (tableFactory.isPresent()) { + TableFactory tf = tableFactory.get(); + if (tf instanceof TableSourceFactory) { + tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table); + } else { + throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory", + catalog.getClass())); + } + } else { + tableSource = TableFactoryUtil.findAndCreateTableSource(table); + } if (!(tableSource instanceof StreamTableSource)) { throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
[flink] 03/03: [hotfix][table-api-java] Moved QueryOperation utilities to o.a.f.t.operations.utils
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6cc8e44adfaab2d6e3ae4c1992b8fa73c4066c81 Author: Dawid Wysakowicz AuthorDate: Tue Jul 2 16:46:23 2019 +0200 [hotfix][table-api-java] Moved QueryOperation utilities to o.a.f.t.operations.utils This closes #8860 --- .../flink/table/api/internal/TableEnvironmentImpl.java| 2 +- .../org/apache/flink/table/api/internal/TableImpl.java| 6 +++--- .../operations/{ => utils}/OperationExpressionsUtils.java | 3 ++- .../operations/{ => utils}/OperationTreeBuilder.java | 5 - .../{ => utils}/QueryOperationDefaultVisitor.java | 15 ++- .../utils/factories/AggregateOperationFactory.java| 2 +- .../operations/utils/factories/ColumnOperationUtils.java | 2 +- .../utils/factories/ProjectionOperationFactory.java | 4 ++-- .../apache/flink/table/plan/QueryOperationConverter.java | 2 +- .../apache/flink/table/plan/QueryOperationConverter.java | 2 +- .../apache/flink/table/api/internal/TableEnvImpl.scala| 1 + 11 files changed, 31 insertions(+), 13 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 9b04f56..e94c65a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -50,9 +50,9 @@ import org.apache.flink.table.operations.CatalogQueryOperation; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.OperationTreeBuilder; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.TableSourceQueryOperation; +import org.apache.flink.table.operations.utils.OperationTreeBuilder; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.sources.TableSourceValidation; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java index 79f8502..8d33e10 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java @@ -40,10 +40,10 @@ import org.apache.flink.table.expressions.resolver.LookupCallResolver; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.table.functions.TemporalTableFunctionImpl; import org.apache.flink.table.operations.JoinQueryOperation.JoinType; -import org.apache.flink.table.operations.OperationExpressionsUtils; -import org.apache.flink.table.operations.OperationExpressionsUtils.CategorizedExpressions; -import org.apache.flink.table.operations.OperationTreeBuilder; import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.utils.OperationExpressionsUtils; +import org.apache.flink.table.operations.utils.OperationExpressionsUtils.CategorizedExpressions; +import org.apache.flink.table.operations.utils.OperationTreeBuilder; import java.util.Arrays; import java.util.Collections; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java similarity index 98% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java index eb2030d..9bf5b59 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.operations; +package org.apache.flink.table.operations.utils; import org.apache.flink.annotation.Internal; import org.apache.flink.table.expressions.CallExpression; @@ -30,6 +30,7 @@ import org.apache.flink.table.expressions.UnresolvedCallExpression; import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor; import org.apache.flink.tabl
[flink] 02/03: [FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to table-api-java module
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1d167a3670614901e4ef011af92b4045c7eb1612 Author: Dawid Wysakowicz AuthorDate: Sat Jun 1 19:08:56 2019 +0200 [FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to table-api-java module --- .../java/internal/StreamTableEnvironmentImpl.java | 8 +- .../flink/table/api/EnvironmentSettings.java | 7 + .../table/api/internal/TableEnvironmentImpl.java | 28 +- .../expressions/utils/ApiExpressionUtils.java | 29 +- .../operations/OperationExpressionsUtils.java | 4 +- .../table/operations/OperationTreeBuilder.java | 680 +++-- .../internal/StreamTableEnvironmentImpl.scala | 10 +- .../flink/table/expressions/ExpressionUtils.java | 8 +- .../operations/OperationTreeBuilderFactory.java| 44 -- .../flink/table/api/internal/TableEnvImpl.scala| 17 +- .../operations/OperationTreeBuilderImpl.scala | 600 -- .../api/stream/StreamTableEnvironmentTest.scala| 3 +- .../flink/table/api/stream/sql/AggregateTest.scala | 3 +- .../apache/flink/table/utils/TableTestBase.scala | 6 +- 14 files changed, 702 insertions(+), 745 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java index 6b37690..05815dd 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java @@ -85,8 +85,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple TableConfig tableConfig, StreamExecutionEnvironment executionEnvironment, Planner planner, - Executor executor) { - super(catalogManager, tableConfig, executor, functionCatalog, planner); + Executor executor, + boolean isStreaming) { + super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreaming); this.executionEnvironment = executionEnvironment; } @@ -119,7 +120,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple tableConfig, executionEnvironment, planner, - executor + executor, + !settings.isBatchMode() ); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index 37ba179..70b7ffd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -114,6 +114,13 @@ public class EnvironmentSettings { return builtInDatabaseName; } + /** +* Tells if the {@link TableEnvironment} should work in a batch or streaming mode. +*/ + public boolean isBatchMode() { + return isBatchMode; + } + @Internal public Map toPlannerProperties() { Map properties = new HashMap<>(toCommonProperties()); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 727727a..9b04f56 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -36,7 +36,6 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ExternalCatalog; import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.QueryOperationCatalogView; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -46,7 +45,6 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.table.descriptors.StreamTableDescriptor; import or
[flink] 01/03: [hotfix][table-planner][table-api-java] Move QueryOperation factories to table-api-java
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 39e4ad2097506f646a1c8a78f753628ad1debb82 Author: Dawid Wysakowicz AuthorDate: Tue Jul 2 14:55:29 2019 +0200 [hotfix][table-planner][table-api-java] Move QueryOperation factories to table-api-java --- .../operations/utils/factories}/AggregateOperationFactory.java | 5 - .../table/operations/utils/factories}/AliasOperationUtils.java | 3 ++- .../table/operations/utils/factories}/CalculatedTableFactory.java | 4 +++- .../table/operations/utils/factories}/ColumnOperationUtils.java| 2 +- .../table/operations/utils/factories}/JoinOperationFactory.java| 5 - .../operations/utils/factories}/ProjectionOperationFactory.java| 4 +++- .../table/operations/utils/factories}/SetOperationFactory.java | 4 +++- .../table/operations/utils/factories}/SortOperationFactory.java| 4 +++- .../apache/flink/table/operations/OperationTreeBuilderImpl.scala | 7 --- 9 files changed, 27 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java similarity index 98% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java index ac6ee30..8fef793 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.operations; +package org.apache.flink.table.operations.utils.factories; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -45,6 +45,9 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionRequirement; import org.apache.flink.table.functions.TableAggregateFunctionDefinition; +import org.apache.flink.table.operations.AggregateQueryOperation; +import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.WindowAggregateQueryOperation; import org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LegacyTypeInformationType; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AliasOperationUtils.java similarity index 97% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AliasOperationUtils.java index c73ca47..a9a45d0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AliasOperationUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.operations; +package org.apache.flink.table.operations.utils.factories; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableSchema; @@ -27,6 +27,7 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.operations.QueryOperation; import java.util.List; import java.util.stream.Collectors; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java similarity index 96% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java index 2e10ee8..fe01128 100644 --- a/flink-table/flink-table-plann
[flink] branch master updated (d903480 -> 6cc8e44)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d903480 [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala new 39e4ad2 [hotfix][table-planner][table-api-java] Move QueryOperation factories to table-api-java new 1d167a3 [FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to table-api-java module new 6cc8e44 [hotfix][table-api-java] Moved QueryOperation utilities to o.a.f.t.operations.utils The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../java/internal/StreamTableEnvironmentImpl.java | 8 +- .../flink/table/api/EnvironmentSettings.java | 7 + .../table/api/internal/TableEnvironmentImpl.java | 30 +- .../apache/flink/table/api/internal/TableImpl.java | 6 +- .../expressions/utils/ApiExpressionUtils.java | 29 +- .../table/operations/OperationTreeBuilder.java | 110 .../{ => utils}/OperationExpressionsUtils.java | 7 +- .../operations/utils/OperationTreeBuilder.java | 697 + .../{ => utils}/QueryOperationDefaultVisitor.java | 15 +- .../factories}/AggregateOperationFactory.java | 7 +- .../utils/factories}/AliasOperationUtils.java | 3 +- .../utils/factories}/CalculatedTableFactory.java | 4 +- .../utils/factories}/ColumnOperationUtils.java | 4 +- .../utils/factories}/JoinOperationFactory.java | 5 +- .../factories}/ProjectionOperationFactory.java | 8 +- .../utils/factories}/SetOperationFactory.java | 4 +- .../utils/factories}/SortOperationFactory.java | 4 +- .../internal/StreamTableEnvironmentImpl.scala | 10 +- .../flink/table/expressions/ExpressionUtils.java | 8 +- .../flink/table/plan/QueryOperationConverter.java | 2 +- .../operations/OperationTreeBuilderFactory.java| 44 -- .../flink/table/plan/QueryOperationConverter.java | 2 +- .../flink/table/api/internal/TableEnvImpl.scala| 18 +- .../operations/OperationTreeBuilderImpl.scala | 599 -- .../api/stream/StreamTableEnvironmentTest.scala| 3 +- .../flink/table/api/stream/sql/AggregateTest.scala | 3 +- .../apache/flink/table/utils/TableTestBase.scala | 6 +- 27 files changed, 817 insertions(+), 826 deletions(-) delete mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/{ => utils}/OperationExpressionsUtils.java (98%) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/{ => utils}/QueryOperationDefaultVisitor.java (73%) rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/AggregateOperationFactory.java (98%) rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/AliasOperationUtils.java (97%) rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/CalculatedTableFactory.java (96%) rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/ColumnOperationUtils.java (97%) rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/JoinOperationFactory.java (95%) rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/ProjectionOperationFactory.java (95%) rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/SetOperationFactory.java (95%) rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/SortOperationFactory.java (96%) delete mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java delete mod
[flink] branch master updated: [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d903480 [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala d903480 is described below commit d903480ef7abc95cd419e6a194db2276dd7eb4cb Author: Timo Walther AuthorDate: Tue Jul 2 16:55:08 2019 +0200 [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala This move the Scala expression DSL to flink-table-api-scala. Users of pure table programs should define there imports like: import org.apache.flink.table.api._ TableEnvironment.create(...) Users of the DataStream API should define their imports like: import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ StreamTableEnvironment.create(...) This commit did not split the package object org.apache.flink.table.api.scala._ into two parts yet because we want to give users the chance to update their imports. This closes #8945. --- docs/dev/table/tableApi.md | 4 + docs/dev/table/tableApi.zh.md | 4 + flink-table/flink-table-api-scala-bridge/pom.xml | 9 +- .../flink/table/api/scala/DataSetConversions.scala | 6 +- .../table/api/scala/DataStreamConversions.scala| 6 +- .../flink/table/api/scala/TableConversions.scala | 25 +- .../org/apache/flink/table/api/scala/package.scala | 89 ++ flink-table/flink-table-api-scala/pom.xml | 47 +++ .../apache/flink/table/api}/expressionDsl.scala| 319 ++--- .../scala/org/apache/flink/table/api/package.scala | 48 .../scala/org/apache/flink/table/api/package.scala | 34 --- .../org/apache/flink/table/api/scala/package.scala | 93 -- 12 files changed, 307 insertions(+), 377 deletions(-) diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index a0bd51a..bd6bd38 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -44,6 +44,9 @@ The following example shows the differences between the Scala and Java Table API The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings. {% highlight java %} +import org.apache.flink.table.api._ +import org.apache.flink.table.api.java._ + // environment configuration ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); @@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is constructed. Table {% highlight scala %} import org.apache.flink.api.scala._ +import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ // environment configuration diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md index 7a75c0a..e4b8a55 100644 --- a/docs/dev/table/tableApi.zh.md +++ b/docs/dev/table/tableApi.zh.md @@ -44,6 +44,9 @@ The following example shows the differences between the Scala and Java Table API The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings. {% highlight java %} +import org.apache.flink.table.api._ +import org.apache.flink.table.api.java._ + // environment configuration ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); @@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is constructed. Table {% highlight scala %} import org.apache.flink.api.scala._ +import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ // environment configuration diff --git a/flink-table/flink-table-api-scala-bridge/pom.xml b/flink-table/flink-table-api-scala-bridge/pom.xml index 2a7f2ec..f49f71c 100644 --- a/flink-table/flink-table-api-scala-bridge/pom.xml +++ b/flink-table/flink-table-api-scala-bridge/pom.xml @@ -61,19 +61,18 @@ under the License. net.alchim31.maven scala-maven-plugin - + scala-compile-first process-resources - add-source compile
[flink] 04/05: [FLINK-12883][runtime] Add getID() to ExecutionVertex
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 2f5fc239ee7712e02f3f3ebfc0a991acdcf6e3cf Author: Gary Yao AuthorDate: Wed Jun 19 14:45:53 2019 +0200 [FLINK-12883][runtime] Add getID() to ExecutionVertex --- .../org/apache/flink/runtime/executiongraph/ExecutionVertex.java | 8 1 file changed, 8 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a12d198..5fc0b72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EvictingBoundedList; import org.apache.flink.util.ExceptionUtils; @@ -83,6 +84,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable priorExecutions; private final Time timeout; @@ -142,6 +145,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable
[flink] 02/05: [hotfix][runtime] Use Set instead of IdentityHashMap where possible
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit bce9a4c43fb6ff61762885cb0fbc984a7338ce19 Author: Gary Yao AuthorDate: Thu Jun 20 17:20:00 2019 +0200 [hotfix][runtime] Use Set instead of IdentityHashMap where possible Replace instances where we use an IdentityHashMap as a set with Collections.newSetFromMap() in RestartPipelinedRegionStrategy and PipelineRegionComputeUtil. --- .../failover/flip1/PipelinedRegionComputeUtil.java | 11 -- .../flip1/RestartPipelinedRegionStrategy.java | 25 +++--- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java index 291c894..b21b52f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java @@ -22,6 +22,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Map; @@ -41,8 +42,6 @@ public final class PipelinedRegionComputeUtil { return uniqueRegions(buildOneRegionForAllVertices(topology)); } - // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist) - // this helps to optimize the building performance as it uses reference equality final Map> vertexToRegion = new IdentityHashMap<>(); // iterate all the vertices which are topologically sorted @@ -105,11 +104,9 @@ public final class PipelinedRegionComputeUtil { private static Set> uniqueRegions(final Map> vertexToRegion) { // find out all the distinct regions - final IdentityHashMap, Object> distinctRegions = new IdentityHashMap<>(); - for (Set regionVertices : vertexToRegion.values()) { - distinctRegions.put(regionVertices, null); - } - return distinctRegions.keySet(); + final Set> distinctRegions = Collections.newSetFromMap(new IdentityHashMap<>()); + distinctRegions.addAll(vertexToRegion.values()); + return distinctRegions; } private PipelinedRegionComputeUtil() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java index 58d1675..a87764b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; @@ -50,7 +51,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { private final FailoverTopology topology; /** All failover regions. */ - private final IdentityHashMap regions; + private final Set regions; /** Maps execution vertex id to failover region. */ private final Map vertexToRegionMap; @@ -80,7 +81,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) { this.topology = checkNotNull(topology); - this.regions = new IdentityHashMap<>(); + this.regions = Collections.newSetFromMap(new IdentityHashMap<>()); this.vertexToRegionMap = new HashMap<>(); this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker( resultPartitionAvailabilityChecker); @@ -100,7 +101,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { for (Set regionVertices : distinctRegions) { LOG.debug("Creating a failover region with {} vertices.", regionVertices.size()); final FailoverRegion failoverRegion = new FailoverRegion(regionVertices); - regions.put(failoverRegion, null); + regions.add(failo
[flink] 03/05: [hotfix][runtime] Remove obsolete comment from PipelinedRegionComputeUtil#uniqueRegions()
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit efebb99d5d3840047741cde4a73511252bd76e93 Author: Gary Yao AuthorDate: Thu Jun 20 17:23:21 2019 +0200 [hotfix][runtime] Remove obsolete comment from PipelinedRegionComputeUtil#uniqueRegions() --- .../executiongraph/failover/flip1/PipelinedRegionComputeUtil.java| 1 - 1 file changed, 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java index b21b52f..14d28b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java @@ -103,7 +103,6 @@ public final class PipelinedRegionComputeUtil { } private static Set> uniqueRegions(final Map> vertexToRegion) { - // find out all the distinct regions final Set> distinctRegions = Collections.newSetFromMap(new IdentityHashMap<>()); distinctRegions.addAll(vertexToRegion.values()); return distinctRegions;
[flink] 01/05: [FLINK-12883][runtime] Extract computation of pipelined regions.
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit a8b72220154d3349b5157d02aab78bf8e3b381dd Author: Gary Yao AuthorDate: Wed Jun 19 10:58:17 2019 +0200 [FLINK-12883][runtime] Extract computation of pipelined regions. --- .../failover/flip1/PipelinedRegionComputeUtil.java | 117 + .../flip1/RestartPipelinedRegionStrategy.java | 75 + 2 files changed, 119 insertions(+), 73 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java new file mode 100644 index 000..291c894 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.Set; + +/** + * Utility for computing pipeliend regions. + */ +public final class PipelinedRegionComputeUtil { + + private static final Logger LOG = LoggerFactory.getLogger(PipelinedRegionComputeUtil.class); + + public static Set> computePipelinedRegions(final FailoverTopology topology) { + // currently we let a job with co-location constraints fail as one region + // putting co-located vertices in the same region with each other can be a future improvement + if (topology.containsCoLocationConstraints()) { + return uniqueRegions(buildOneRegionForAllVertices(topology)); + } + + // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist) + // this helps to optimize the building performance as it uses reference equality + final Map> vertexToRegion = new IdentityHashMap<>(); + + // iterate all the vertices which are topologically sorted + for (FailoverVertex vertex : topology.getFailoverVertices()) { + Set currentRegion = new HashSet<>(1); + currentRegion.add(vertex); + vertexToRegion.put(vertex, currentRegion); + + for (FailoverEdge inputEdge : vertex.getInputEdges()) { + if (inputEdge.getResultPartitionType().isPipelined()) { + final FailoverVertex producerVertex = inputEdge.getSourceVertex(); + final Set producerRegion = vertexToRegion.get(producerVertex); + + if (producerRegion == null) { + throw new IllegalStateException("Producer task " + producerVertex.getExecutionVertexName() + + " failover region is null while calculating failover region for the consumer task " + + vertex.getExecutionVertexName() + ". This should be a failover region building bug."); + } + + // check if it is the same as the producer region, if so skip the merge + // this check can significantly reduce compute complexity in All-to-All PIPELINED edge case + if (currentRegion != producerRegion) { + // merge current region and producer region + // merge the smaller region into the larger one to reduce the cost + final Set smallerSet; + final Set largerSet; +
[flink] 05/05: [FLINK-12883][runtime] Introduce PartitionReleaseStrategy
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c9aa9a170ff657198c4710706bc3802de42063ca Author: Gary Yao AuthorDate: Wed Jun 19 15:06:51 2019 +0200 [FLINK-12883][runtime] Introduce PartitionReleaseStrategy - Introduce interface PartitionReleaseStrategy. - Introduce RegionPartitionReleaseStrategy and NotReleasingPartitionReleaseStrategy implementations, which can be configured via a new config option. - Add unit tests for new classes. - Increase visibility of methods in TestingSchedulingTopology for unit tests outside of its package. --- .../flink/configuration/JobManagerOptions.java | 9 + .../runtime/executiongraph/ExecutionGraph.java | 132 +++--- .../executiongraph/ExecutionGraphBuilder.java | 6 + .../runtime/executiongraph/ExecutionVertex.java| 1 + .../failover/flip1/PipelinedRegionComputeUtil.java | 20 +- .../NotReleasingPartitionReleaseStrategy.java | 56 ++ .../partitionrelease/PartitionReleaseStrategy.java | 58 ++ .../PartitionReleaseStrategyFactoryLoader.java | 41 + .../flip1/partitionrelease/PipelinedRegion.java| 69 +++ .../PipelinedRegionConsumedBlockingPartitions.java | 51 ++ .../PipelinedRegionExecutionView.java | 64 +++ .../RegionPartitionReleaseStrategy.java| 190 +++ .../ExecutionGraphPartitionReleaseTest.java| 202 + .../RegionPartitionReleaseStrategyTest.java| 149 +++ .../PartitionReleaseStrategyFactoryLoaderTest.java | 55 ++ .../PipelinedRegionExecutionViewTest.java | 75 .../strategy/TestingSchedulingTopology.java| 14 +- 17 files changed, 1154 insertions(+), 38 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 89515fd..0f9e18d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -173,6 +173,15 @@ public class JobManagerOptions { text("'legacy': legacy scheduler"), text("'ng': new generation scheduler")) .build()); + /** +* Config parameter controlling whether partitions should already be released during the job execution. +*/ + @Documentation.ExcludeFromDocumentation("User normally should not be expected to deactivate this feature. " + + "We aim at removing this flag eventually.") + public static final ConfigOption PARTITION_RELEASE_DURING_JOB_EXECUTION = + key("jobmanager.partition.release-during-job-execution") + .defaultValue(true) + .withDescription("Controls whether partitions should already be released during the job execution."); @Documentation.ExcludeFromDocumentation("dev use only; likely temporary") public static final ConfigOption FORCE_PARTITION_RELEASE_ON_CONSUMPTION = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 514121e..ce65b68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -48,6 +48,9 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy; import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; @@ -55,6 +58,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionTracker; import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org
[flink] branch master updated (a0f747d -> c9aa9a1)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a0f747d [hotfix][runtime] Remove legacy NoOpIOManager class new a8b7222 [FLINK-12883][runtime] Extract computation of pipelined regions. new bce9a4c [hotfix][runtime] Use Set instead of IdentityHashMap where possible new efebb99 [hotfix][runtime] Remove obsolete comment from PipelinedRegionComputeUtil#uniqueRegions() new 2f5fc23 [FLINK-12883][runtime] Add getID() to ExecutionVertex new c9aa9a1 [FLINK-12883][runtime] Introduce PartitionReleaseStrategy The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/configuration/JobManagerOptions.java | 9 + .../runtime/executiongraph/ExecutionGraph.java | 132 +++--- .../executiongraph/ExecutionGraphBuilder.java | 6 + .../runtime/executiongraph/ExecutionVertex.java| 9 + .../failover/flip1/PipelinedRegionComputeUtil.java | 131 + .../flip1/RestartPipelinedRegionStrategy.java | 100 ++ .../NotReleasingPartitionReleaseStrategy.java | 56 ++ .../partitionrelease/PartitionReleaseStrategy.java | 58 ++ .../PartitionReleaseStrategyFactoryLoader.java}| 39 ++-- .../flip1/partitionrelease/PipelinedRegion.java| 69 +++ .../PipelinedRegionConsumedBlockingPartitions.java | 51 ++ .../PipelinedRegionExecutionView.java | 64 +++ .../RegionPartitionReleaseStrategy.java| 190 +++ .../ExecutionGraphPartitionReleaseTest.java| 202 + .../RegionPartitionReleaseStrategyTest.java| 149 +++ .../PartitionReleaseStrategyFactoryLoaderTest.java | 55 ++ .../PipelinedRegionExecutionViewTest.java | 75 .../strategy/TestingSchedulingTopology.java| 14 +- 18 files changed, 1263 insertions(+), 146 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java copy flink-runtime/src/{test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java => main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java} (51%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java
[flink] 03/06: [hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 423e8a8afa4fd440ba2abb6c2b535f881ef84374 Author: Zhijiang AuthorDate: Fri Jun 28 12:18:47 2019 +0800 [hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown IOManager#close would ignore any exceptions internally in order not to interrupt other close operations, then IOManager#isProperlyShutDown is used for checking any exceptions during close process. We could use IOUtils#closeAll for handling all the close operations and finally throwing the suppressed exceptions to get the same effect, then isProperlyShutDown method could be removed completely. --- .../flink/runtime/io/disk/iomanager/IOManager.java | 47 - .../runtime/io/disk/iomanager/IOManagerAsync.java | 105 ++--- .../flink/runtime/io/disk/ChannelViewsTest.java| 5 +- .../runtime/io/disk/FileChannelStreamsITCase.java | 3 +- .../flink/runtime/io/disk/SpillingBufferTest.java | 5 +- .../AsynchronousBufferFileWriterTest.java | 2 +- .../iomanager/AsynchronousFileIOChannelTest.java | 2 +- .../BufferFileWriterFileSegmentReaderTest.java | 2 +- .../disk/iomanager/BufferFileWriterReaderTest.java | 2 +- .../io/disk/iomanager/IOManagerAsyncTest.java | 3 +- .../runtime/io/disk/iomanager/IOManagerITCase.java | 1 - .../runtime/io/disk/iomanager/IOManagerTest.java | 2 +- .../runtime/operators/hash/HashTableITCase.java| 5 +- .../hash/NonReusingHashJoinIteratorITCase.java | 5 +- .../operators/hash/ReOpenableHashTableITCase.java | 5 +- .../hash/ReOpenableHashTableTestBase.java | 5 +- .../hash/ReusingHashJoinIteratorITCase.java| 5 +- .../resettable/SpillingResettableIteratorTest.java | 5 +- ...pillingResettableMutableObjectIteratorTest.java | 5 +- .../AbstractSortMergeOuterJoinIteratorITCase.java | 5 +- .../sort/CombiningUnilateralSortMergerITCase.java | 5 +- .../runtime/operators/sort/ExternalSortITCase.java | 5 +- .../sort/ExternalSortLargeRecordsITCase.java | 5 +- .../sort/FixedLengthRecordSorterTest.java | 2 +- ...NonReusingSortMergeInnerJoinIteratorITCase.java | 5 +- .../ReusingSortMergeInnerJoinIteratorITCase.java | 5 +- .../testutils/BinaryOperatorTestBase.java | 3 +- .../operators/testutils/DriverTestBase.java| 1 - .../operators/testutils/MockEnvironment.java | 4 +- .../runtime/operators/testutils/TaskTestBase.java | 2 +- .../operators/testutils/UnaryOperatorTestBase.java | 1 - .../operators/util/HashVsSortMiniBenchmark.java| 5 +- .../runtime/taskexecutor/TaskExecutorTest.java | 1 - .../streaming/runtime/io/BufferSpillerTest.java| 2 +- ...CheckpointBarrierAlignerAlignmentLimitTest.java | 2 +- .../io/SpillingCheckpointBarrierAlignerTest.java | 2 +- .../streaming/runtime/tasks/OperatorChainTest.java | 2 +- .../runtime/tasks/StreamTaskTestHarness.java | 1 - .../util/AbstractStreamOperatorTestHarness.java| 4 +- .../flink/table/runtime/aggregate/HashAggTest.java | 5 +- .../runtime/hashtable/BinaryHashTableTest.java | 5 +- .../io/CompressedHeaderlessChannelTest.java| 6 +- .../join/Int2SortMergeJoinOperatorTest.java| 5 +- .../runtime/sort/BinaryExternalSorterTest.java | 5 +- .../runtime/sort/BufferedKVExternalSorterTest.java | 5 +- 45 files changed, 111 insertions(+), 196 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index ee54b1e..a649e42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -24,15 +24,19 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; /** * The facade for the provided I/O manager services. @@ -82,39 +86,26 @@ public abstract class IOManager implements AutoCloseable { } /** -* Close method, marks the I/O manager as closed -* and removed all temporary files. +* Removes all temporary files. */ @Override - public void close() { - // remove all of our temp directories -
[flink] 02/06: [hotfix][runtime] IOManager implements AutoCloseable
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5809ed32f869fa880a70a774d5b8365fe59dba4a Author: Zhijiang AuthorDate: Tue Jun 18 12:09:04 2019 +0800 [hotfix][runtime] IOManager implements AutoCloseable --- .../flink/runtime/io/disk/iomanager/IOManager.java | 5 ++-- .../runtime/io/disk/iomanager/IOManagerAsync.java | 8 +++--- .../runtime/taskexecutor/TaskManagerServices.java | 2 +- .../flink/runtime/io/disk/ChannelViewsTest.java| 2 +- .../runtime/io/disk/FileChannelStreamsITCase.java | 2 +- .../runtime/io/disk/FileChannelStreamsTest.java| 12 ++-- .../io/disk/SeekableFileChannelInputViewTest.java | 6 +--- .../flink/runtime/io/disk/SpillingBufferTest.java | 2 +- .../AsynchronousBufferFileWriterTest.java | 2 +- .../iomanager/AsynchronousFileIOChannelTest.java | 19 +++-- .../BufferFileWriterFileSegmentReaderTest.java | 2 +- .../disk/iomanager/BufferFileWriterReaderTest.java | 2 +- .../io/disk/iomanager/IOManagerAsyncTest.java | 4 +-- .../runtime/io/disk/iomanager/IOManagerITCase.java | 2 +- .../runtime/io/disk/iomanager/IOManagerTest.java | 33 -- .../runtime/operators/hash/HashTableITCase.java| 2 +- .../hash/HashTablePerformanceComparison.java | 5 +--- .../runtime/operators/hash/HashTableTest.java | 21 ++ .../hash/NonReusingHashJoinIteratorITCase.java | 2 +- .../operators/hash/ReOpenableHashTableITCase.java | 2 +- .../hash/ReOpenableHashTableTestBase.java | 2 +- .../hash/ReusingHashJoinIteratorITCase.java| 2 +- .../resettable/SpillingResettableIteratorTest.java | 2 +- ...pillingResettableMutableObjectIteratorTest.java | 2 +- .../AbstractSortMergeOuterJoinIteratorITCase.java | 2 +- .../sort/CombiningUnilateralSortMergerITCase.java | 2 +- .../runtime/operators/sort/ExternalSortITCase.java | 2 +- .../sort/ExternalSortLargeRecordsITCase.java | 2 +- .../sort/FixedLengthRecordSorterTest.java | 2 +- .../operators/sort/LargeRecordHandlerITCase.java | 18 ++-- .../operators/sort/LargeRecordHandlerTest.java | 21 ++ ...NonReusingSortMergeInnerJoinIteratorITCase.java | 2 +- .../ReusingSortMergeInnerJoinIteratorITCase.java | 2 +- .../operators/sort/UnilateralSortMergerTest.java | 4 +-- .../testutils/BinaryOperatorTestBase.java | 2 +- .../operators/testutils/DriverTestBase.java| 2 +- .../operators/testutils/MockEnvironment.java | 2 +- .../operators/testutils/UnaryOperatorTestBase.java | 2 +- .../operators/util/HashVsSortMiniBenchmark.java| 2 +- .../streaming/runtime/io/BufferSpillerTest.java| 2 +- ...CheckpointBarrierAlignerAlignmentLimitTest.java | 2 +- .../CheckpointBarrierAlignerMassiveRandomTest.java | 7 + .../io/SpillingCheckpointBarrierAlignerTest.java | 2 +- .../StreamNetworkBenchmarkEnvironment.java | 2 +- .../runtime/tasks/StreamTaskTestHarness.java | 2 +- .../flink/table/runtime/aggregate/HashAggTest.java | 2 +- .../runtime/hashtable/BinaryHashTableTest.java | 2 +- .../io/CompressedHeaderlessChannelTest.java| 2 +- .../join/Int2SortMergeJoinOperatorTest.java| 2 +- .../runtime/sort/BinaryExternalSorterTest.java | 2 +- .../runtime/sort/BufferedKVExternalSorterTest.java | 2 +- .../manual/HashTableRecordWidthCombinations.java | 7 + .../flink/test/manual/MassiveStringSorting.java| 14 ++--- .../test/manual/MassiveStringValueSorting.java | 14 ++--- 54 files changed, 82 insertions(+), 192 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index 6723597..ee54b1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue; /** * The facade for the provided I/O manager services. */ -public abstract class IOManager { +public abstract class IOManager implements AutoCloseable { protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class); /** The temporary directories for files. */ @@ -85,7 +85,8 @@ public abstract class IOManager { * Close method, marks the I/O manager as closed * and removed all temporary files. */ - public void shutdown() { + @Override + public void close() { // remove all of our temp directories for (File path : paths) { try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink
[flink] 06/06: [hotfix][runtime] Remove legacy NoOpIOManager class
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit a0f747dca5b22312c50056aacb7d5b6f8c3fa1ac Author: Zhijiang AuthorDate: Mon Jun 24 11:59:09 2019 +0800 [hotfix][runtime] Remove legacy NoOpIOManager class --- .../runtime/io/disk/iomanager/NoOpIOManager.java | 73 -- 1 file changed, 73 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java deleted file mode 100644 index f98c46f..000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.disk.iomanager; - -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.util.EnvironmentInformation; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * An {@link IOManager} that cannot do I/O but serves as a mock for tests. - */ -public class NoOpIOManager extends IOManager { - - public NoOpIOManager() { - super(new String[] {EnvironmentInformation.getTemporaryFileDirectory()}); - } - - @Override - public BlockChannelWriter createBlockChannelWriter(ID channelID, LinkedBlockingQueue returnQueue) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public BlockChannelWriterWithCallback createBlockChannelWriter(ID channelID, RequestDoneCallback callback) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public BlockChannelReader createBlockChannelReader(ID channelID, LinkedBlockingQueue returnQueue) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public BufferFileWriter createBufferFileWriter(ID channelID) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public BufferFileReader createBufferFileReader(ID channelID, RequestDoneCallback callback) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public BufferFileSegmentReader createBufferFileSegmentReader(ID channelID, RequestDoneCallback callback) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public BulkBlockChannelReader createBulkBlockChannelReader(ID channelID, List targetSegments, int numBlocks) throws IOException { - throw new UnsupportedOperationException(); - } -}
[flink] branch master updated (95944e2 -> a0f747d)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 95944e2 [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor new cdbfb82 [hotfix][runtime] Cleanup IOManager code new 5809ed3 [hotfix][runtime] IOManager implements AutoCloseable new 423e8a8 [hotfix][runtime] Refactor IOManager#close and remove #isProperlyShutDown new f9acd2f [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager new 80003d6 [FLINK-12735][network] Make shuffle environment implementation independent with IOManager new a0f747d [hotfix][runtime] Remove legacy NoOpIOManager class The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/runtime/io/disk/FileChannelManager.java | 37 ++-- .../runtime/io/disk/FileChannelManagerImpl.java| 126 + .../runtime/io/disk/iomanager/FileIOChannel.java | 78 .../flink/runtime/io/disk/iomanager/IOManager.java | 206 +++-- .../runtime/io/disk/iomanager/IOManagerAsync.java | 107 ++- .../io/network/NettyShuffleEnvironment.java| 13 ++ .../io/network/NettyShuffleServiceFactory.java | 17 +- .../network/partition/ResultPartitionFactory.java | 16 +- .../runtime/shuffle/ShuffleEnvironmentContext.java | 10 +- .../runtime/taskexecutor/TaskManagerServices.java | 11 +- .../NettyShuffleEnvironmentConfiguration.java | 23 ++- .../flink/runtime/io/disk/ChannelViewsTest.java| 7 +- .../runtime/io/disk/FileChannelStreamsITCase.java | 5 +- .../runtime/io/disk/FileChannelStreamsTest.java| 12 +- .../runtime/io/disk/NoOpFileChannelManager.java} | 33 ++-- .../io/disk/SeekableFileChannelInputViewTest.java | 6 +- .../flink/runtime/io/disk/SpillingBufferTest.java | 7 +- .../AsynchronousBufferFileWriterTest.java | 4 +- .../iomanager/AsynchronousFileIOChannelTest.java | 21 +-- .../BufferFileWriterFileSegmentReaderTest.java | 4 +- .../disk/iomanager/BufferFileWriterReaderTest.java | 4 +- .../io/disk/iomanager/IOManagerAsyncTest.java | 7 +- .../IOManagerAsyncWithNoOpBufferFileWriter.java| 53 -- .../runtime/io/disk/iomanager/IOManagerITCase.java | 3 +- .../runtime/io/disk/iomanager/IOManagerTest.java | 35 ++-- .../runtime/io/disk/iomanager/NoOpIOManager.java | 73 .../io/network/NettyShuffleEnvironmentBuilder.java | 17 +- .../io/network/NettyShuffleEnvironmentTest.java| 23 ++- .../partition/BoundedBlockingSubpartitionTest.java | 24 ++- .../BoundedBlockingSubpartitionWriteReadTest.java | 21 ++- .../io/network/partition/PartitionTestUtils.java | 21 +++ .../network/partition/ResultPartitionBuilder.java | 12 +- .../partition/ResultPartitionFactoryTest.java | 22 ++- .../io/network/partition/ResultPartitionTest.java | 27 ++- .../runtime/operators/hash/HashTableITCase.java| 7 +- .../hash/HashTablePerformanceComparison.java | 5 +- .../runtime/operators/hash/HashTableTest.java | 21 +-- .../hash/NonReusingHashJoinIteratorITCase.java | 7 +- .../operators/hash/ReOpenableHashTableITCase.java | 7 +- .../hash/ReOpenableHashTableTestBase.java | 7 +- .../hash/ReusingHashJoinIteratorITCase.java| 7 +- .../resettable/SpillingResettableIteratorTest.java | 7 +- ...pillingResettableMutableObjectIteratorTest.java | 7 +- .../AbstractSortMergeOuterJoinIteratorITCase.java | 7 +- .../sort/CombiningUnilateralSortMergerITCase.java | 7 +- .../runtime/operators/sort/ExternalSortITCase.java | 7 +- .../sort/ExternalSortLargeRecordsITCase.java | 7 +- .../sort/FixedLengthRecordSorterTest.java | 4 +- .../operators/sort/LargeRecordHandlerITCase.java | 42 ++--- .../operators/sort/LargeRecordHandlerTest.java | 21 +-- ...NonReusingSortMergeInnerJoinIteratorITCase.java | 7 +- .../ReusingSortMergeInnerJoinIteratorITCase.java | 7 +- .../operators/sort/UnilateralSortMergerTest.java | 4 +- .../testutils/BinaryOperatorTestBase.java | 5 +- .../operators/testutils/DriverTestBase.java| 3 +- .../operators/testutils/MockEnvironment.java | 6 +- .../runtime/operators/testutils/TaskTestBase.java | 2 +- .../operators/testutils/UnaryOperatorTestBase.java | 3 +- .../operators/util/HashVsSortMiniBenchmark.java| 7 +- .../runtime/taskexecutor/TaskExecutorTest.java | 1 - .../streaming/runtime/io/BufferSpillerTest.java| 4 +- ...CheckpointBarrierAlignerAlignmentLimitTest.java | 4 +- .../CheckpointBarrierAlignerMassiveRandomTest.java | 7 +- .../io/SpillingCheckpointBarrierAlignerTest.
[flink] 05/06: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 80003d62f386fc95a3dbbc414f2cd4de7a26e1bd Author: Zhijiang AuthorDate: Thu Jun 27 00:29:16 2019 +0800 [FLINK-12735][network] Make shuffle environment implementation independent with IOManager The current creation of NettyShuffleEnvironment relies on IOManager from TaskManagerServices. Actually the shuffle only needs the file channel during creating partition, so it could internally create a light-weight FileChannelManager with its own prefix folder name instead of the heavy-weight IOManagerAsync. --- .../io/network/NettyShuffleEnvironment.java| 13 ++ .../io/network/NettyShuffleServiceFactory.java | 17 +--- .../network/partition/ResultPartitionFactory.java | 16 +++ .../runtime/shuffle/ShuffleEnvironmentContext.java | 10 + .../runtime/taskexecutor/TaskManagerServices.java | 9 ++-- .../NettyShuffleEnvironmentConfiguration.java | 23 -- .../runtime/io/disk/NoOpFileChannelManager.java| 51 ++ .../io/network/NettyShuffleEnvironmentBuilder.java | 17 .../io/network/NettyShuffleEnvironmentTest.java| 23 +- .../partition/BoundedBlockingSubpartitionTest.java | 24 +- .../BoundedBlockingSubpartitionWriteReadTest.java | 21 - .../io/network/partition/PartitionTestUtils.java | 21 + .../network/partition/ResultPartitionBuilder.java | 12 ++--- .../partition/ResultPartitionFactoryTest.java | 22 +- .../io/network/partition/ResultPartitionTest.java | 27 +++- .../StreamNetworkBenchmarkEnvironment.java | 7 --- 16 files changed, 250 insertions(+), 63 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java index 5171d75..17fb2cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; @@ -85,6 +86,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment inputGatesById; private final ResultPartitionFactory resultPartitionFactory; @@ -99,6 +102,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment(10); + this.fileChannelManager = fileChannelManager; this.resultPartitionFactory = resultPartitionFactory; this.singleInputGateFactory = singleInputGateFactory; this.isClosed = false; @@ -326,6 +331,14 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment { + private static final String DIR_NAME_PREFIX = "netty-shuffle"; + @Override public NettyShuffleMaster createShuffleMaster(Configuration configuration) { return NettyShuffleMaster.INSTANCE; @@ -62,8 +65,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory shuffleEnvironment = createShuffleEnvironment( taskManagerServicesConfiguration, taskEventDispatcher, - taskManagerMetricGroup, - ioManager); + taskManagerMetricGroup); final int dataPort = shuffleEnvironment.start(); final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration); @@ -307,8 +306,7 @@ public class TaskManagerServices { private static ShuffleEnvironment createShuffleEnvironment( TaskManagerServicesConfiguration taskManagerServicesConfiguration, TaskEventDispatcher taskEventDispatcher, - MetricGroup taskManagerMetricGroup, - IOManager ioManager) throws FlinkException { + MetricGroup taskManagerMetricGroup) throws FlinkException { final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext( taskManagerServicesConfiguration.getConfiguration(), @@ -317,8 +315,7 @@ public class TaskManagerServices {
[flink] 01/06: [hotfix][runtime] Cleanup IOManager code
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit cdbfb82ef3eaa242abf6d070463c0895ac244ef1 Author: Zhijiang AuthorDate: Wed Jun 19 12:36:54 2019 +0800 [hotfix][runtime] Cleanup IOManager code --- .../runtime/io/disk/iomanager/FileIOChannel.java | 78 ++-- .../flink/runtime/io/disk/iomanager/IOManager.java | 83 +++--- .../IOManagerAsyncWithNoOpBufferFileWriter.java| 53 -- .../operators/sort/LargeRecordHandlerITCase.java | 26 --- 4 files changed, 91 insertions(+), 149 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java index fd8e8e6..ef57e03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java @@ -18,93 +18,90 @@ package org.apache.flink.runtime.io.disk.iomanager; +import org.apache.flink.util.StringUtils; + import java.io.File; import java.io.IOException; import java.nio.channels.FileChannel; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.flink.util.StringUtils; - /** * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of * files that contain sorted runs of data from the same stream, that will later on be merged together. */ public interface FileIOChannel { - + /** * Gets the channel ID of this I/O channel. -* +* * @return The channel ID. */ - FileIOChannel.ID getChannelID(); - + ID getChannelID(); + /** * Gets the size (in bytes) of the file underlying the channel. -* -* @return The size (in bytes) of the file underlying the channel. */ long getSize() throws IOException; - + /** * Checks whether the channel has been closed. -* +* * @return True if the channel has been closed, false otherwise. */ boolean isClosed(); /** - * Closes the channel. For asynchronous implementations, this method waits until all pending requests are - * handled. Even if an exception interrupts the closing, the underlying FileChannel is closed. - * - * @throws IOException Thrown, if an error occurred while waiting for pending requests. - */ +* Closes the channel. For asynchronous implementations, this method waits until all pending requests are +* handled. Even if an exception interrupts the closing, the underlying FileChannel is closed. +* +* @throws IOException Thrown, if an error occurred while waiting for pending requests. +*/ void close() throws IOException; /** * Deletes the file underlying this I/O channel. -* +* * @throws IllegalStateException Thrown, when the channel is still open. */ void deleteChannel(); - - /** - * Closes the channel and deletes the underlying file. - * For asynchronous implementations, this method waits until all pending requests are handled; - * - * @throws IOException Thrown, if an error occurred while waiting for pending requests. - */ - public void closeAndDelete() throws IOException; FileChannel getNioFileChannel(); - + + /** +* Closes the channel and deletes the underlying file. For asynchronous implementations, +* this method waits until all pending requests are handled. +* +* @throws IOException Thrown, if an error occurred while waiting for pending requests. +*/ + void closeAndDelete() throws IOException; + // // - + /** * An ID identifying an underlying file channel. */ - public static class ID { - + class ID { + private static final int RANDOM_BYTES_LENGTH = 16; - + private final File path; - + private final int threadNum; - protected ID(File path, int threadNum) { + private ID(File path, int threadNum) { this.path = path; this.threadNum = threadNum; } - protected ID(File basePath, int threadNum, Random random) { + public ID(File basePath, int t
[flink] 04/06: [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f9acd2ff317b4a6181e85ba50ddbe177573351d7 Author: Zhijiang AuthorDate: Mon Jul 1 23:31:30 2019 +0800 [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager IOManager mainly has two roles. One is for managing file channels based on config temp dirs, and the other is for abstracting ways to read/writer files. We could define a FileChannelManager class for handing the file channels which could be reused for shuffle environment future. To do so the shuffle environment do not need to rely on the whole IOManager. --- .../flink/runtime/io/disk/FileChannelManager.java | 45 .../runtime/io/disk/FileChannelManagerImpl.java| 126 + .../flink/runtime/io/disk/iomanager/IOManager.java | 119 ++- 3 files changed, 203 insertions(+), 87 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java new file mode 100644 index 000..22079db --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; + +import java.io.File; + +/** + * The manager used for creating/getting file IO channels based on config temp dirs. + */ +public interface FileChannelManager extends AutoCloseable { + + /** +* Creates an ID identifying an underlying file channel and returns it. +*/ + ID createChannel(); + + /** +* Creates an enumerator for channels that logically belong together and returns it. +*/ + Enumerator createChannelEnumerator(); + + /** +* Gets all the files corresponding to the config temp dirs. +*/ + File[] getPaths(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java new file mode 100644 index 000..2bdb8d9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The manager used for creating/deleting file channels based on config temp dirs. + */ +public class FileChannelManagerImpl implements FileChannelManager { + priv
[flink] 02/02: [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 95944e231315f085d5e23717332aa2866caa5d8a Author: Dawid Wysakowicz AuthorDate: Wed Jun 19 15:00:23 2019 +0200 [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor This closes #8852. --- flink-python/pyflink/table/table_config.py | 36 .../table/tests/test_table_environment_api.py | 12 -- .../client/gateway/local/ExecutionContextTest.java | 3 +- .../client/gateway/local/LocalExecutorITCase.java | 7 +- .../table/api/java/BatchTableEnvironment.java | 5 +- .../table/api/java/StreamTableEnvironment.java | 155 +- .../java/internal/StreamTableEnvironmentImpl.java | 66 +++--- .../flink/table/api/EnvironmentSettings.java | 228 + .../org/apache/flink/table/api/TableConfig.java| 44 .../apache/flink/table/api/TableEnvironment.java | 22 +- .../table/api/internal/TableEnvironmentImpl.java | 4 +- .../flink/table/delegation/ExecutorFactory.java| 50 + .../internal => delegation}/PlannerFactory.java| 49 ++--- .../flink/table/factories/ComponentFactory.java| 53 + .../table/factories/ComponentFactoryService.java | 80 .../factories/ComponentFactoryServiceTest.java | 68 ++ .../factories/utils/OtherTestPlannerFactory.java | 28 +++ .../table/factories/utils/TestPlannerFactory.java | 69 +++ .../org.apache.flink.table.factories.TableFactory | 6 +- .../table/api/scala/BatchTableEnvironment.scala| 7 +- .../table/api/scala/StreamTableEnvironment.scala | 132 .../internal/StreamTableEnvironmentImpl.scala | 90 .../flink/table/executor/StreamExecutor.java | 4 +- ...utorFactory.java => StreamExecutorFactory.java} | 49 +++-- .../flink/table/planner/StreamPlannerFactory.java | 70 +++ .../org.apache.flink.table.factories.TableFactory | 2 + .../flink/table/api/internal/TableEnvImpl.scala| 4 +- .../api/stream/StreamTableEnvironmentTest.scala| 26 ++- .../apache/flink/table/utils/TableTestBase.scala | 76 +++ 29 files changed, 1052 insertions(+), 393 deletions(-) diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py index 7eb3513..d6b5864 100644 --- a/flink-python/pyflink/table/table_config.py +++ b/flink-python/pyflink/table/table_config.py @@ -90,39 +90,3 @@ class TableConfig(object): self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length) else: raise Exception("TableConfig.max_generated_code_length should be a int value!") - -def get_built_in_catalog_name(self): -""" -Gets the specified name of the initial catalog to be created when instantiating -:class:`TableEnvironment`. -""" -return self._j_table_config.getBuiltInCatalogName() - -def set_built_in_catalog_name(self, built_in_catalog_name): -""" -Specifies the name of the initial catalog to be created when instantiating -:class:`TableEnvironment`. This method has no effect if called on the -:func:`~pyflink.table.TableEnvironment.get_config`. -""" -if built_in_catalog_name is not None and isinstance(built_in_catalog_name, str): -self._j_table_config.setBuiltInCatalogName(built_in_catalog_name) -else: -raise Exception("TableConfig.built_in_catalog_name should be a string value!") - -def get_built_in_database_name(self): -""" -Gets the specified name of the default database in the initial catalog to be created when -instantiating :class:`TableEnvironment`. -""" -return self._j_table_config.getBuiltInDatabaseName() - -def set_built_in_database_name(self, built_in_database_name): -""" -Specifies the name of the default database in the initial catalog to be created when -instantiating :class:`TableEnvironment`. This method has no effect if called on the -:func:`~pyflink.table.TableEnvironment.get_config`. -""" -if built_in_database_name is not None and isinstance(built_in_database_name, str): -self._j_table_config.setBuiltInDatabaseName(built_in_database_name) -else: -raise Exception("TableConfig.built_in_database_name should be a string value!") diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index a129d22..4eba1bb 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -170,8 +170,6 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase): table_config.set
[flink] branch master updated (9fa61aa -> 95944e2)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 9fa61aa [FLINK-13029][table-planner] Removed usages of ExpressionBridge in QueryOperation's factories new 1a224b3 [hotfix][table-common] Enable finding multiple matching TableFactories from TableFactoryService new 95944e2 [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: flink-python/pyflink/table/table_config.py | 36 .../table/tests/test_table_environment_api.py | 12 -- .../client/gateway/local/ExecutionContextTest.java | 3 +- .../client/gateway/local/LocalExecutorITCase.java | 7 +- .../table/api/java/BatchTableEnvironment.java | 5 +- .../table/api/java/StreamTableEnvironment.java | 155 +- .../java/internal/StreamTableEnvironmentImpl.java | 66 +++--- .../flink/table/api/EnvironmentSettings.java | 228 + .../org/apache/flink/table/api/TableConfig.java| 44 .../apache/flink/table/api/TableEnvironment.java | 22 +- .../table/api/internal/TableEnvironmentImpl.java | 4 +- .../flink/table/delegation/ExecutorFactory.java| 50 + .../internal => delegation}/PlannerFactory.java| 49 ++--- .../flink/table/factories/ComponentFactory.java} | 32 ++- .../table/factories/ComponentFactoryService.java | 80 .../factories/ComponentFactoryServiceTest.java | 68 ++ .../factories/utils/OtherTestPlannerFactory.java | 12 +- .../table/factories/utils/TestPlannerFactory.java | 69 +++ .../org.apache.flink.table.factories.TableFactory | 3 +- .../table/api/scala/BatchTableEnvironment.scala| 7 +- .../table/api/scala/StreamTableEnvironment.scala | 132 .../internal/StreamTableEnvironmentImpl.scala | 90 .../table/api/AmbiguousTableFactoryException.java | 36 ++-- .../apache/flink/table/factories/TableFactory.java | 1 - .../flink/table/factories/TableFactoryService.java | 146 - .../flink/table/executor/StreamExecutor.java | 4 +- ...utorFactory.java => StreamExecutorFactory.java} | 49 +++-- .../flink/table/planner/StreamPlannerFactory.java | 70 +++ .../org.apache.flink.table.factories.TableFactory | 2 + .../flink/table/api/internal/TableEnvImpl.scala| 4 +- .../api/stream/StreamTableEnvironmentTest.scala| 26 ++- .../factories/TableFormatFactoryServiceTest.scala | 7 - .../apache/flink/table/utils/TableTestBase.scala | 76 +++ 33 files changed, 1115 insertions(+), 480 deletions(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/{api/internal => delegation}/PlannerFactory.java (55%) copy flink-table/{flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationSchemaFactory.java => flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java} (52%) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java copy flink-core/src/main/java/org/apache/flink/util/SerializableObject.java => flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java (72%) create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java copy {flink-connectors/flink-connector-kafka-0.10/src/main => flink-table/flink-table-api-java/src/test}/resources/META-INF/services/org.apache.flink.table.factories.TableFactory (86%) rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/{ExecutorFactory.java => StreamExecutorFactory.java} (51%) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
[flink] 01/02: [hotfix][table-common] Enable finding multiple matching TableFactories from TableFactoryService
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1a224b3e0b2e11fcd580edb5f8060f7ffd3c61a7 Author: Dawid Wysakowicz AuthorDate: Wed Jun 19 15:00:02 2019 +0200 [hotfix][table-common] Enable finding multiple matching TableFactories from TableFactoryService This commits adds methods TableFactoryService.findAll that return TableFactories even if they are ambiguous based on the requiredContext and supportedProperties. Additionally it fixes minor issues and improves type hanlding. --- .../table/api/AmbiguousTableFactoryException.java | 36 +++-- .../apache/flink/table/factories/TableFactory.java | 1 - .../flink/table/factories/TableFactoryService.java | 146 ++--- .../factories/TableFormatFactoryServiceTest.scala | 7 - 4 files changed, 117 insertions(+), 73 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java index 395d5c98..ea1c72e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java @@ -31,20 +31,20 @@ import java.util.stream.Collectors; public class AmbiguousTableFactoryException extends RuntimeException { // factories that match the properties - private final List matchingFactories; + private final List matchingFactories; // required factory class - private final Class factoryClass; + private final Class factoryClass; // all found factories private final List factories; // properties that describe the configuration private final Map properties; public AmbiguousTableFactoryException( - List matchingFactories, - Class factoryClass, - List factories, - Map properties, - Throwable cause) { + List matchingFactories, + Class factoryClass, + List factories, + Map properties, + Throwable cause) { super(cause); this.matchingFactories = matchingFactories; @@ -54,10 +54,10 @@ public class AmbiguousTableFactoryException extends RuntimeException { } public AmbiguousTableFactoryException( - List matchingFactories, - Class factoryClass, - List factories, - Map properties) { + List matchingFactories, + Class factoryClass, + List factories, + Map properties) { this(matchingFactories, factoryClass, factories, properties, null); } @@ -70,15 +70,13 @@ public class AmbiguousTableFactoryException extends RuntimeException { "The following properties are requested:\n%s\n\n" + "The following factories have been considered:\n%s", factoryClass.getName(), - String.join( - "\n", - matchingFactories.stream() - .map(p -> p.getClass().getName()).collect(Collectors.toList())), + matchingFactories.stream() + .map(p -> p.getClass().getName()) + .collect(Collectors.joining("\n")), DescriptorProperties.toString(properties), - String.join( - "\n", - factories.stream().map(p -> p.getClass().getName()).collect(Collectors.toList()) - ) + factories.stream() + .map(p -> p.getClass().getName()) + .collect(Collectors.joining("\n")) ); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java index b56ffa1..d4c0628 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java @@ -55,7 +55,6 @@ public interface TableFactory { */ Map requiredContext(); - /** * List of property keys that this factory can handle. This method will be used for validation. * If a property i
[flink] branch master updated (036c549 -> 9fa61aa)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 036c549 [FLINK-13028][table-api-java] Merge flatten and call resolution rule add b9ec89b [FLINK-13029][table-planner] Ported GROUP BY expression to new type system add 9fa61aa [FLINK-13029][table-planner] Removed usages of ExpressionBridge in QueryOperation's factories No new revisions were added by this update. Summary of changes: .../utils/LegacyTypeInfoDataTypeConverter.java | 27 +- .../operations/AggregateOperationFactory.java | 99 +++--- .../table/operations/JoinOperationFactory.java | 19 ++--- .../operations/OperationTreeBuilderFactory.java| 7 -- .../operations/ProjectionOperationFactory.java | 19 ++--- .../flink/table/api/internal/TableEnvImpl.scala| 1 - .../expressions/PlannerExpressionConverter.scala | 42 + .../operations/OperationTreeBuilderImpl.scala | 17 ++-- .../flink/table/api/batch/table/CalcTest.scala | 44 ++ .../api/validation/TableSourceValidationTest.scala | 45 -- 10 files changed, 181 insertions(+), 139 deletions(-)
[flink] 03/05: [FLINK-13028][table-api-java] Refactor local over windows
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 8c1968ac2880c3784c6c35ffb819932483208807 Author: Timo Walther AuthorDate: Mon Jul 1 09:12:41 2019 +0200 [FLINK-13028][table-api-java] Refactor local over windows --- .../flink/table/expressions/LocalOverWindow.java | 76 ++ .../table/expressions/ExpressionResolver.java | 21 +++--- .../expressions/rules/OverWindowResolverRule.java | 16 ++--- .../table/expressions/rules/ResolverRule.java | 6 +- .../flink/table/plan/logical/groupWindows.scala| 15 + 5 files changed, 98 insertions(+), 36 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java new file mode 100644 index 000..45e1a7a --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * Local over window created during expression resolution. + */ +@Internal +public final class LocalOverWindow { + + private Expression alias; + + private List partitionBy; + + private Expression orderBy; + + private Expression preceding; + + private @Nullable Expression following; + + LocalOverWindow( + Expression alias, + List partitionBy, + Expression orderBy, + Expression preceding, + @Nullable Expression following) { + this.alias = alias; + this.partitionBy = partitionBy; + this.orderBy = orderBy; + this.preceding = preceding; + this.following = following; + } + + public Expression getAlias() { + return alias; + } + + public List getPartitionBy() { + return partitionBy; + } + + public Expression getOrderBy() { + return orderBy; + } + + public Expression getPreceding() { + return preceding; + } + + public Optional getFollowing() { + return Optional.ofNullable(following); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java index 015751c..ea45f33 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java @@ -30,7 +30,6 @@ import org.apache.flink.table.expressions.rules.ResolverRules; import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.operations.QueryOperation; -import org.apache.flink.table.plan.logical.LogicalOverWindow; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; @@ -100,13 +99,13 @@ public class ExpressionResolver { private final Map localReferences; - private final Map overWindows; + private final Map localOverWindows; private ExpressionResolver( TableReferenceLookup tableLookup, FunctionLookup functionLookup, FieldReferenceLookup fieldLookup, - List overWindows, + List localOverWindows, List localReferences) { this.tableLookup = Preconditions.checkNotNull(tableLookup); this.fieldLookup = Preconditions.checkNotNull(fieldLook
[flink] 04/05: [FLINK-13028][table] Refactor expression package structure
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 14fe641fc97888ccc7035051a697334ec39a0e62 Author: Timo Walther AuthorDate: Mon Jul 1 10:49:50 2019 +0200 [FLINK-13028][table] Refactor expression package structure --- .../table/api/OverWindowPartitionedOrdered.java | 2 +- .../table/api/internal/TableEnvironmentImpl.java| 2 +- .../apache/flink/table/api/internal/TableImpl.java | 2 +- .../PlannerExpressionParser.java| 8 +--- .../flink/table/expressions/ExpressionParser.java | 1 + .../expressions/resolver}/ExpressionResolver.java | 21 ++--- .../expressions/{ => resolver}/LocalOverWindow.java | 5 ++--- .../{ => resolver}/LookupCallResolver.java | 6 +- .../resolver}/lookups/FieldReferenceLookup.java | 2 +- .../lookups/TableReferenceLookup.java | 2 +- .../resolver}/rules/ExpandColumnFunctionsRule.java | 8 .../resolver}/rules/LookupCallByNameRule.java | 4 ++-- .../resolver}/rules/OverWindowResolverRule.java | 8 .../rules/QualifyBuiltInFunctionsRule.java | 2 +- .../resolver}/rules/ReferenceResolverRule.java | 7 --- .../resolver}/rules/ResolveCallByArgumentsRule.java | 2 +- .../resolver}/rules/ResolveFlattenCallRule.java | 4 ++-- .../expressions/resolver}/rules/ResolverRule.java | 10 +- .../expressions/resolver}/rules/ResolverRules.java | 2 +- .../resolver}/rules/RuleExpressionVisitor.java | 6 +++--- .../rules/StarReferenceFlatteningRule.java | 2 +- .../{ => utils}/ApiExpressionDefaultVisitor.java| 13 - .../expressions/{ => utils}/ApiExpressionUtils.java | 11 ++- .../ResolvedExpressionDefaultVisitor.java | 10 +- .../table/operations/OperationExpressionsUtils.java | 10 +- .../flink/table/typeutils/FieldInfoUtils.java | 2 +- .../flink/table/operations/QueryOperationTest.java | 2 +- .../flink/table/expressions/ExpressionBuilder.java | 1 + .../functions/aggfunctions/AvgAggFunction.java | 2 +- .../functions/aggfunctions/ConcatAggFunction.java | 2 +- .../functions/aggfunctions/Count1AggFunction.java | 2 +- .../functions/aggfunctions/CountAggFunction.java| 2 +- .../aggfunctions/DeclarativeAggregateFunction.java | 2 +- .../functions/aggfunctions/IncrSumAggFunction.java | 2 +- .../aggfunctions/IncrSumWithRetractAggFunction.java | 2 +- .../functions/aggfunctions/LeadLagAggFunction.java | 2 +- .../functions/aggfunctions/MaxAggFunction.java | 2 +- .../functions/aggfunctions/MinAggFunction.java | 2 +- .../functions/aggfunctions/RankAggFunction.java | 2 +- .../aggfunctions/RankLikeAggFunctionBase.java | 2 +- .../aggfunctions/RowNumberAggFunction.java | 2 +- .../aggfunctions/SingleValueAggFunction.java| 2 +- .../functions/aggfunctions/Sum0AggFunction.java | 2 +- .../functions/aggfunctions/SumAggFunction.java | 2 +- .../aggfunctions/SumWithRetractAggFunction.java | 2 +- .../table/codegen/agg/DeclarativeAggCodeGen.scala | 4 ++-- .../table/codegen/agg/batch/AggCodeGenHelper.scala | 3 ++- .../codegen/agg/batch/HashAggCodeGenHelper.scala| 3 ++- .../logical/LogicalWindowAggregateRuleBase.scala| 2 +- .../flink/table/plan/util/RexNodeExtractor.scala| 2 +- .../flink/table/sources/TableSourceUtil.scala | 2 +- .../table/sources/tsextractors/ExistingField.scala | 2 +- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 2 +- .../table/plan/util/RexNodeExtractorTest.scala | 2 +- .../apache/flink/table/util/testTableSources.scala | 2 +- .../table/operations/AggregateOperationFactory.java | 6 +++--- .../flink/table/operations/AliasOperationUtils.java | 8 .../table/operations/CalculatedTableFactory.java| 4 ++-- .../table/operations/ColumnOperationUtils.java | 6 +++--- .../table/operations/JoinOperationFactory.java | 2 +- .../operations/OperationTreeBuilderFactory.java | 2 +- .../operations/ProjectionOperationFactory.java | 4 ++-- .../table/operations/SortOperationFactory.java | 4 ++-- .../flink/table/plan/QueryOperationConverter.java | 4 ++-- .../table/api/internal/BatchTableEnvImpl.scala | 3 ++- .../flink/table/api/internal/TableEnvImpl.scala | 2 +- .../flink/table/api/scala/expressionDsl.scala | 2 +- .../flink/table/expressions/ExpressionBridge.scala | 1 + .../expressions/PlannerExpressionParserImpl.scala | 3 ++- .../table/operations/OperationTreeBuilderImpl.scala | 8 +--- .../flink/table/plan/util/RexProgramExtractor.scala | 2 +- .../flink/table/expressions/KeywordParseTest.scala | 2 +- 72 files changed, 163 insertions(+), 113 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apa
[flink] 02/05: [FLINK-13028][table-api-java] Remove planner expression from ExpressionResolver
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 272ac71328aac0583df859bf197e30afcec0981f Author: Timo Walther AuthorDate: Mon Jul 1 08:25:05 2019 +0200 [FLINK-13028][table-api-java] Remove planner expression from ExpressionResolver --- .../table/expressions/ExpressionResolver.java | 98 +++--- .../expressions/rules/OverWindowResolverRule.java | 50 ++- .../rules/ResolveCallByArgumentsRule.java | 26 -- ...enCallRule.java => ResolveFlattenCallRule.java} | 39 + .../table/expressions/rules/ResolverRule.java | 6 -- .../table/expressions/rules/ResolverRules.java | 4 +- .../table/validation/CalcValidationTest.scala | 12 +++ 7 files changed, 119 insertions(+), 116 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java index 394180b..015751c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java @@ -19,14 +19,9 @@ package org.apache.flink.table.expressions; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.GroupWindow; import org.apache.flink.table.api.OverWindow; -import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias; -import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.lookups.FieldReferenceLookup; import org.apache.flink.table.expressions.lookups.TableReferenceLookup; @@ -36,15 +31,9 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.plan.logical.LogicalOverWindow; -import org.apache.flink.table.plan.logical.LogicalWindow; -import org.apache.flink.table.plan.logical.SessionGroupWindow; -import org.apache.flink.table.plan.logical.SlidingGroupWindow; -import org.apache.flink.table.plan.logical.TumblingGroupWindow; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Preconditions; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -54,11 +43,8 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -import scala.Some; - import static org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral; import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral; -import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; /** * Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression} @@ -97,15 +83,13 @@ public class ExpressionResolver { ResolverRules.EXPAND_COLUMN_FUNCTIONS, ResolverRules.OVER_WINDOWS, ResolverRules.FIELD_RESOLVE, - ResolverRules.FLATTEN_CALL, ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS, - ResolverRules.RESOLVE_CALL_BY_ARGUMENTS); + ResolverRules.RESOLVE_CALL_BY_ARGUMENTS, + ResolverRules.FLATTEN_CALL); } private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR = new VerifyResolutionVisitor(); - private final PlannerExpressionConverter bridgeConverter = PlannerExpressionConverter.INSTANCE(); - private final FieldReferenceLookup fieldLookup; private final TableReferenceLookup tableLookup; @@ -187,54 +171,6 @@ public class ExpressionResolver { } /** -* Converts an API class to a logical window for planning with expressions already resolved. -* -* @param window window to resolve -* @return logical window with expressions resolved -*/ - public LogicalWindow resolveGroupWindow(GroupWindow window) { - Expression alias = window.getAlias(); - - if (!(alias instanceof UnresolvedReferenceExpression)) { - throw new ValidationException("Alias of group window should be an UnresolvedFieldReference"); - } - - final String windowName = ((UnresolvedReferenceExpression)
[flink] 05/05: [FLINK-13028][table-api-java] Merge flatten and call resolution rule
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 036c5492a9bfe7636d5e7d5eb664af5e77952707 Author: Timo Walther AuthorDate: Mon Jul 1 15:25:04 2019 +0200 [FLINK-13028][table-api-java] Merge flatten and call resolution rule --- .../expressions/resolver/ExpressionResolver.java | 3 +- .../resolver/rules/ResolveCallByArgumentsRule.java | 94 --- .../resolver/rules/ResolveFlattenCallRule.java | 101 - .../expressions/resolver/rules/ResolverRules.java | 5 - .../flink/table/api/batch/table/CalcTest.scala | 1 + .../table/validation/CalcValidationTest.scala | 12 --- 6 files changed, 62 insertions(+), 154 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java index 4c1e62d..e4c07fa 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java @@ -90,8 +90,7 @@ public class ExpressionResolver { ResolverRules.OVER_WINDOWS, ResolverRules.FIELD_RESOLVE, ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS, - ResolverRules.RESOLVE_CALL_BY_ARGUMENTS, - ResolverRules.FLATTEN_CALL); + ResolverRules.RESOLVE_CALL_BY_ARGUMENTS); } private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR = new VerifyResolutionVisitor(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java index 73429f4..1f184c8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java @@ -19,6 +19,8 @@ package org.apache.flink.table.expressions.resolver.rules; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -38,21 +40,26 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil; import org.apache.flink.table.types.inference.TypeStrategies; import org.apache.flink.util.Preconditions; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.singletonList; +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral; +import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; + /** * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers * the output data type. All function calls are resolved {@link CallExpression} after applying this - * rule except for the special case of {@link BuiltInFunctionDefinitions#FLATTEN}. + * rule. + * + * This rule also resolves {@code flatten()} calls on composite types. * * If the call expects different types of arguments, but the given arguments have types that can * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted. - * - * @see ResolveFlattenCallRule */ @Internal final class ResolveCallByArgumentsRule implements ResolverRule { @@ -60,39 +67,27 @@ final class ResolveCallByArgumentsRule implements ResolverRule { @Override public List apply(List expression, ResolutionContext context) { return expression.stream() - .map(expr -> expr.accept(new CallArgumentsCastingVisitor(context))) + .flatMap(expr -> expr.accept(new ResolvingCallVisitor(context)).stream()) .collect(Collectors.toList()); } - private class CallArgumentsCastingVisitor extends RuleExpressionVisitor { + // + + private class ResolvingCallVisitor extends RuleExpressionVisit
[flink] branch master updated (c3f2ad8 -> 036c549)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c3f2ad8 [FLINK-12809][table-api] Introduce PartitionableTableSink for supporting writing data into partitions new 6161043 [FLINK-13028][table-api-java] Extract legacy type inference logic new 272ac71 [FLINK-13028][table-api-java] Remove planner expression from ExpressionResolver new 8c1968a [FLINK-13028][table-api-java] Refactor local over windows new 14fe641 [FLINK-13028][table] Refactor expression package structure new 036c549 [FLINK-13028][table-api-java] Merge flatten and call resolution rule The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../table/api/OverWindowPartitionedOrdered.java| 2 +- .../table/api/internal/TableEnvironmentImpl.java | 2 +- .../apache/flink/table/api/internal/TableImpl.java | 2 +- .../PlannerExpressionParser.java | 8 +- .../table/delegation/PlannerTypeInferenceUtil.java | 75 .../flink/table/expressions/ExpressionParser.java | 1 + .../expressions/resolver}/ExpressionResolver.java | 137 -- .../expressions/resolver/LocalOverWindow.java | 75 .../{ => resolver}/LookupCallResolver.java | 6 +- .../resolver}/lookups/FieldReferenceLookup.java| 2 +- .../lookups/TableReferenceLookup.java | 2 +- .../resolver}/rules/ExpandColumnFunctionsRule.java | 8 +- .../resolver}/rules/LookupCallByNameRule.java | 4 +- .../resolver}/rules/OverWindowResolverRule.java| 68 +-- .../rules/QualifyBuiltInFunctionsRule.java | 2 +- .../resolver}/rules/ReferenceResolverRule.java | 7 +- .../rules/ResolveCallByArgumentsRule.java | 209 + .../expressions/resolver}/rules/ResolverRule.java | 20 +- .../expressions/resolver}/rules/ResolverRules.java | 7 +- .../resolver}/rules/RuleExpressionVisitor.java | 6 +- .../rules/StarReferenceFlatteningRule.java | 2 +- .../{ => utils}/ApiExpressionDefaultVisitor.java | 13 +- .../{ => utils}/ApiExpressionUtils.java| 11 +- .../ResolvedExpressionDefaultVisitor.java | 10 +- .../operations/OperationExpressionsUtils.java | 10 +- .../flink/table/typeutils/FieldInfoUtils.java | 2 +- .../flink/table/operations/QueryOperationTest.java | 2 +- .../flink/table/expressions/ExpressionBuilder.java | 1 + .../functions/aggfunctions/AvgAggFunction.java | 2 +- .../functions/aggfunctions/ConcatAggFunction.java | 2 +- .../functions/aggfunctions/Count1AggFunction.java | 2 +- .../functions/aggfunctions/CountAggFunction.java | 2 +- .../aggfunctions/DeclarativeAggregateFunction.java | 2 +- .../functions/aggfunctions/IncrSumAggFunction.java | 2 +- .../IncrSumWithRetractAggFunction.java | 2 +- .../functions/aggfunctions/LeadLagAggFunction.java | 2 +- .../functions/aggfunctions/MaxAggFunction.java | 2 +- .../functions/aggfunctions/MinAggFunction.java | 2 +- .../functions/aggfunctions/RankAggFunction.java| 2 +- .../aggfunctions/RankLikeAggFunctionBase.java | 2 +- .../aggfunctions/RowNumberAggFunction.java | 2 +- .../aggfunctions/SingleValueAggFunction.java | 2 +- .../functions/aggfunctions/Sum0AggFunction.java| 2 +- .../functions/aggfunctions/SumAggFunction.java | 2 +- .../aggfunctions/SumWithRetractAggFunction.java| 2 +- .../table/codegen/agg/DeclarativeAggCodeGen.scala | 4 +- .../table/codegen/agg/batch/AggCodeGenHelper.scala | 3 +- .../codegen/agg/batch/HashAggCodeGenHelper.scala | 3 +- .../logical/LogicalWindowAggregateRuleBase.scala | 2 +- .../flink/table/plan/util/RexNodeExtractor.scala | 2 +- .../flink/table/sources/TableSourceUtil.scala | 2 +- .../table/sources/tsextractors/ExistingField.scala | 2 +- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 2 +- .../table/plan/util/RexNodeExtractorTest.scala | 2 +- .../apache/flink/table/util/testTableSources.scala | 2 +- .../expressions/PlannerTypeInferenceUtilImpl.java | 140 ++ .../table/expressions/rules/FlattenCallRule.java | 90 - .../operations/AggregateOperationFactory.java | 6 +- .../table/operations/AliasOperationUtils.java | 8 +- .../table/operations/CalculatedTableFactory.java | 4 +- .../table/operations/ColumnOperationUtils.java | 6 +- .../table/operations/JoinOperationFactory.java | 2 +- .../operations/OperationTreeBuilderFactory.java| 2 +- .../operations/ProjectionOperationFactory.java | 4 +- .../table/operations/SortOperationFactory.java | 4
[flink] 01/05: [FLINK-13028][table-api-java] Extract legacy type inference logic
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6161043a2afbd458f2587f3f63df60f620c755b7 Author: Timo Walther AuthorDate: Fri Jun 28 13:34:29 2019 +0200 [FLINK-13028][table-api-java] Extract legacy type inference logic --- .../table/delegation/PlannerTypeInferenceUtil.java | 75 +++ .../expressions/PlannerTypeInferenceUtilImpl.java | 140 + .../rules/ResolveCallByArgumentsRule.java | 127 --- 3 files changed, 237 insertions(+), 105 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java new file mode 100644 index 000..4e003ff --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.delegation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.table.types.inference.TypeInferenceUtil; + +import java.lang.reflect.Constructor; +import java.util.List; + +/** + * Temporary utility for validation and output type inference until all {@code PlannerExpression} are + * upgraded to work with {@link TypeInferenceUtil}. + */ +@Internal +public interface PlannerTypeInferenceUtil { + + static PlannerTypeInferenceUtil create() { + return SingletonPlannerTypeInferenceUtil.getPlannerTypeInferenceUtil(); + } + + /** +* Same behavior as {@link TypeInferenceUtil#runTypeInference(TypeInference, CallContext)}. +*/ + TypeInferenceUtil.Result runTypeInference( + UnresolvedCallExpression unresolvedCall, + List resolvedArgs); + + /** +* A singleton pattern utility for avoiding creating many {@link PlannerTypeInferenceUtil}. +*/ + class SingletonPlannerTypeInferenceUtil { + + private static PlannerTypeInferenceUtil plannerTypeInferenceUtil; + + public static PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() { + if (plannerTypeInferenceUtil == null) { + try { + final Class clazz = + Class.forName("org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl"); + final Constructor con = clazz.getConstructor(); + plannerTypeInferenceUtil = (PlannerTypeInferenceUtil) con.newInstance(); + } catch (Throwable t) { + throw new TableException("Instantiation of PlannerTypeInferenceUtil failed.", t); + } + } + return plannerTypeInferenceUtil; + } + + private SingletonPlannerTypeInferenceUtil() { + // no instantiation + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java new file mode 100644 index 000..30c3421 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additiona
[flink] 03/03: [FLINK-12809][table-api] Introduce PartitionableTableSink for supporting writing data into partitions
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c3f2ad8c8c10097dc7f680ccdf2e9b99babc349c Author: 云邪 AuthorDate: Wed Jun 12 11:01:40 2019 +0800 [FLINK-12809][table-api] Introduce PartitionableTableSink for supporting writing data into partitions --- .../flink/table/sinks/PartitionableTableSink.java | 112 + 1 file changed, 112 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java new file mode 100644 index 000..5e7c26c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; +import java.util.Map; + +/** + * An interface for partitionable {@link TableSink}. A partitionable sink can writes + * query results to partitions. + * + * Partition columns are defined via {@link #getPartitionFieldNames()} and the field names + * should be sorted in a strict order. And all the partition fields should exist in the + * {@link TableSink#getTableSchema()}. + * + * For example, a partitioned table named {@code my_table} with a table schema + * {@code [a INT, b VARCHAR, c DOUBLE, dt VARCHAR, country VARCHAR]} is partitioned on columns + * {@code dt, country}. Then {@code dt} is the first partition column, and + * {@code country} is the secondary partition column. + * + * We can insert data into table partitions using INSERT INTO PARTITION syntax, for example: + * + * + * INSERT INTO my_table PARTITION (dt='2019-06-20', country='bar') select a, b, c from my_view + * + * + * When all the partition columns are set a value in PARTITION clause, it is inserting into a + * static partition. It will writes the query result into a static partition, + * i.e. {@code dt='2019-06-20', country='bar'}. The user specified static partitions will be told + * to the sink via {@link #setStaticPartition(Map)}. + * + * The INSERT INTO PARTITION syntax also supports dynamic partition inserts. + * + * + * INSERT INTO my_table PARTITION (dt='2019-06-20') select a, b, c, country from another_view + * + * + * When partial partition columns (prefix part of all partition columns) are set a value in + * PARTITION clause, it is writing the query result into a dynamic partition. In the above example, + * the static partition part is {@code dt='2019-06-20'} which will be told to the sink via + * {@link #setStaticPartition(Map)}. And the {@code country} is the dynamic partition which will be + * get from each record. + */ +@Experimental +public interface PartitionableTableSink { + + /** +* Gets the partition field names of the table. The partition field names should be sorted in +* a strict order, i.e. they have the order as specified in the PARTITION statement in DDL. +* This should be an empty set if the table is not partitioned. +* +* All the partition fields should exist in the {@link TableSink#getTableSchema()}. +* +* @return partition field names of the table, empty if the table is not partitioned. +*/ + List getPartitionFieldNames(); + + /** +* Sets the static partition into the {@link TableSink}. The static partition may be partial +* of all partition columns. See the class Javadoc for more details. +* +* The static partition is represented as a {@code Map} which maps from +* partition field name to partition value. The partition values are all encoded as strings, +* i.e. encoded using String.valueOf(...). For example, if we have a static partition +* {@code f0=1024, f1="foo", f2="bar"}. f0 is an integer type, f1 and f2 are string types. +* They will all be encoded as strings: "1024", "foo", "bar". And can be decoded to origi
[flink] branch master updated (a6d72fe -> c3f2ad8)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a6d72fe [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner new 6e48056 [FLINK-12805][table-api] Introduce PartitionableTableSource for partition pruning new fb111ef [FLINK-12808][table-api] Introduce OverwritableTableSink for supporting insert overwrite new c3f2ad8 [FLINK-12809][table-api] Introduce PartitionableTableSink for supporting writing data into partitions The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/table/sinks/OverwritableTableSink.java} | 21 ++-- .../flink/table/sinks/PartitionableTableSink.java | 112 + .../table/sources/PartitionableTableSource.java| 68 + 3 files changed, 191 insertions(+), 10 deletions(-) copy flink-table/{flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java => flink-table-common/src/main/java/org/apache/flink/table/sinks/OverwritableTableSink.java} (65%) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/PartitionableTableSource.java
[flink] 02/03: [FLINK-12808][table-api] Introduce OverwritableTableSink for supporting insert overwrite
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit fb111ef22b8f87dd25e61160678fc8857dffcfe0 Author: 云邪 AuthorDate: Wed Jun 12 11:01:20 2019 +0800 [FLINK-12808][table-api] Introduce OverwritableTableSink for supporting insert overwrite --- .../flink/table/sinks/OverwritableTableSink.java | 36 ++ 1 file changed, 36 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/OverwritableTableSink.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/OverwritableTableSink.java new file mode 100644 index 000..6b46ef2 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/OverwritableTableSink.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.annotation.Experimental; + +/** + * A {@link TableSink} that supports INSERT OVERWRITE should implement this trait. + * INSERT OVERWRITE will overwrite any existing data in the table or partition. + * + * @see PartitionableTableSink for the definition of partition. + */ +@Experimental +public interface OverwritableTableSink { + + /** +* Configures whether the insert should overwrite existing data or not. +*/ + void setOverwrite(boolean overwrite); +}
[flink] 01/03: [FLINK-12805][table-api] Introduce PartitionableTableSource for partition pruning
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6e4805673739fa1f8e85389d38ef6f3de20fc005 Author: 云邪 AuthorDate: Wed Jun 12 11:01:01 2019 +0800 [FLINK-12805][table-api] Introduce PartitionableTableSource for partition pruning --- .../table/sources/PartitionableTableSource.java| 68 ++ 1 file changed, 68 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/PartitionableTableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/PartitionableTableSource.java new file mode 100644 index 000..591ec50 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/PartitionableTableSource.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.sinks.TableSink; + +import java.util.List; +import java.util.Map; + +/** + * An interface for partitionable {@link TableSource}. + * + * A {@link PartitionableTableSource} can exclude partitions from reading, which + * includes skipping the metadata. This is especially useful when there are thousands + * of partitions in a table. + * + * A partition is represented as a {@code Map} which maps from partition + * field name to partition value. Since the map is NOT ordered, the correct order of partition + * fields should be obtained via {@link #getPartitionFieldNames()}. + */ +@Experimental +public interface PartitionableTableSource { + + /** +* Returns all the partitions of this {@link PartitionableTableSource}. +*/ + List> getPartitions(); + + /** +* Gets the partition field names of the table. The partition field names should be sorted in +* a strict order, i.e. they have the order as specified in the PARTITION statement in DDL. +* This should be an empty set if the table is not partitioned. +* +* All the partition fields should exist in the {@link TableSink#getTableSchema()}. +* +* @return partition field names of the table, empty if the table is not partitioned. +*/ + List getPartitionFieldNames(); + + /** +* Applies the remaining partitions to the table source. The {@code remainingPartitions} is +* the remaining partitions of {@link #getPartitions()} after partition pruning applied. +* +* After trying to apply partition pruning, we should return a new {@link TableSource} +* instance which holds all pruned-partitions. +* +* @param remainingPartitions Remaining partitions after partition pruning applied. +* @return A new cloned instance of {@link TableSource} holds all pruned-partitions. +*/ + TableSource applyPartitionPruning(List> remainingPartitions); +}
[flink] branch master updated: [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a6d72fe [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner a6d72fe is described below commit a6d72fed708e99404c96ffeca6ad70e08963e9cd Author: godfreyhe AuthorDate: Sat Jun 22 12:50:06 2019 +0800 [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner This closes #8832 --- .../flink/table/api/PlannerConfigOptions.java | 6 + .../plan/optimize/program/FlinkBatchProgram.scala | 22 +- .../plan/optimize/program/FlinkStreamProgram.scala | 23 +- .../table/plan/rules/FlinkBatchRuleSets.scala | 16 + .../table/plan/rules/FlinkStreamRuleSets.scala | 16 + .../logical/RewriteMultiJoinConditionRule.scala| 129 + .../table/plan/batch/sql/join/JoinReorderTest.xml | 600 +++ .../logical/RewriteMultiJoinConditionRuleTest.xml | 318 +++ .../table/plan/stream/sql/join/JoinReorderTest.xml | 635 + .../plan/batch/sql/join/JoinReorderTest.scala | 25 + .../table/plan/common/JoinReorderTestBase.scala| 233 .../FlinkAggregateInnerJoinTransposeRuleTest.scala | 2 +- .../logical/FlinkJoinPushExpressionsRuleTest.scala | 10 +- .../RewriteMultiJoinConditionRuleTest.scala| 153 + .../plan/stream/sql/join/JoinReorderTest.scala | 25 + 15 files changed, 2208 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java index 7b05df1..5e7a1a3 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java @@ -137,4 +137,10 @@ public class PlannerConfigOptions { .defaultValue(true) .withDescription("Allow trying to push predicate down to a FilterableTableSource. " + "the default value is true, means allow the attempt."); + + public static final ConfigOption SQL_OPTIMIZER_JOIN_REORDER_ENABLED = + key("sql.optimizer.join-reorder.enabled") + .defaultValue(false) + .withDescription("Enables join reorder in optimizer cbo. Default is disabled."); + } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala index 090b4fb..e73fde6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.plan.optimize.program import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.PlannerConfigOptions import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.rules.FlinkBatchRuleSets @@ -33,6 +34,7 @@ object FlinkBatchProgram { val DECORRELATE = "decorrelate" val DEFAULT_REWRITE = "default_rewrite" val PREDICATE_PUSHDOWN = "predicate_pushdown" + val JOIN_REORDER = "join_reorder" val JOIN_REWRITE = "join_rewrite" val WINDOW = "window" val LOGICAL = "logical" @@ -120,7 +122,7 @@ object FlinkBatchProgram { .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkBatchRuleSets.FILTER_PREPARE_RULES) .build(), "other predicate rewrite") -.setIterations(5).build()) +.setIterations(5).build(), "predicate rewrite") .addProgram( FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) @@ -135,6 +137,24 @@ object FlinkBatchProgram { .build(), "prune empty after predicate push down") .build()) +// join reorder +if (config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) { + chainedProgram.addLast( +JOIN_REORDER, +FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext] + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder +.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) +.setHepMatchOrder(HepMatchOrder.BOTTOM_UP) +
[flink] branch master updated: [FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI join and lookup join
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 0119afa [FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI join and lookup join 0119afa is described below commit 0119afaf28da2cf39e772b4fb568812ad09cfa79 Author: godfreyhe AuthorDate: Sat Jun 1 17:44:31 2019 +0800 [FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI join and lookup join This closes #8588 --- .../plan/metadata/FlinkRelMdColumnInterval.scala | 10 ++ .../plan/metadata/FlinkRelMdColumnNullCount.scala | 10 ++ .../metadata/FlinkRelMdColumnOriginNullCount.scala | 4 +- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 18 ++- .../metadata/FlinkRelMdModifiedMonotonicity.scala | 13 +- .../flink/table/plan/metadata/FlinkRelMdSize.scala | 7 +- .../plan/metadata/FlinkRelMdUniqueGroups.scala | 8 +- .../table/plan/metadata/FlinkRelMdUniqueKeys.scala | 19 ++- .../table/plan/batch/sql/join/LookupJoinTest.scala | 1 - .../metadata/FlinkRelMdColumnIntervalTest.scala| 23 +++ .../metadata/FlinkRelMdColumnNullCountTest.scala | 14 ++ .../FlinkRelMdColumnOriginNullCountTest.scala | 10 +- .../metadata/FlinkRelMdColumnUniquenessTest.scala | 37 + .../metadata/FlinkRelMdDistinctRowCountTest.scala | 18 +++ .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 174 - .../FlinkRelMdModifiedMonotonicityTest.scala | 3 + .../FlinkRelMdPercentageOriginalRowsTest.scala | 4 + .../metadata/FlinkRelMdPopulationSizeTest.scala| 14 ++ .../plan/metadata/FlinkRelMdRowCountTest.scala | 17 ++ .../plan/metadata/FlinkRelMdSelectivityTest.scala | 12 ++ .../table/plan/metadata/FlinkRelMdSizeTest.scala | 5 + .../plan/metadata/FlinkRelMdUniqueGroupsTest.scala | 45 +- .../plan/metadata/FlinkRelMdUniqueKeysTest.scala | 23 +++ .../table/plan/metadata/MetadataTestUtil.scala | 19 ++- 24 files changed, 476 insertions(+), 32 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala index 20d4610..ed13607 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala @@ -101,6 +101,16 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] { } /** +* Gets interval of the given column on Snapshot. +* +* @param snapshotSnapshot RelNode +* @param mqRelMetadataQuery instance +* @param index the index of the given column +* @return interval of the given column on Snapshot. +*/ + def getColumnInterval(snapshot: Snapshot, mq: RelMetadataQuery, index: Int): ValueInterval = null + + /** * Gets interval of the given column on Project. * * Note: Only support the simple RexNode, e.g RexInputRef. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala index ec7a59c..673f1cd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala @@ -68,6 +68,16 @@ class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] } /** +* Gets the null count of the given column on Snapshot. +* +* @param snapshotSnapshot RelNode +* @param mqRelMetadataQuery instance +* @param index the index of the given column +* @return the null count of the given column on Snapshot. +*/ + def getColumnNullCount(snapshot: Snapshot, mq: RelMetadataQuery, index: Int): JDouble = null + + /** * Gets the null count of the given column in Project. * * @param project Project RelNode diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala index 6b88b9b..61bb21b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/fl