[GitHub] [flink] Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix bug in statsd exporter when using negative values
Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix bug in statsd exporter when using negative values URL: https://github.com/apache/flink/pull/8259#discussion_r280971043 ## File path: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ## @@ -240,12 +240,6 @@ public String filterCharacters(String input) { } private boolean numberIsNegative(Number input) { - try { - return new BigDecimal(input.toString()).compareTo(BigDecimal.ZERO) == -1; - } catch (Exception e) { - //not all Number's can be converted to a BigDecimal, such as Infinity or NaN - //in this case we just say it isn't a negative number - return false; - } + return input.doubleValue() < 0; Review comment: Ah sorry, no reason! I've changed it to Double.compare(), I've also changed line #190 to use this method so the double check there works in the same way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969654 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -25,17 +25,16 @@ under the License. * ToC {:toc} -Apache Flink streaming applications are typically designed to run indefinitely or for long periods of time. -As with all long-running services, the applications need to be updated to adapt to changing requirements. -This goes the same for data schemas that the applications work against; they evolve along with the application. +Apache Flink 流应用通常被设计为永远或者长时间运行。 +与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。 +对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。 -This page provides an overview of how you can evolve your state type's data schema. -The current restrictions varies across different types and state structures (`ValueState`, `ListState`, etc.). +此页面概述了如何升级状态类型的数据结构。 +目前的限制因不同类型和状态结构而异(`ValueState`、`ListState` 等)。 Review comment: `目前对不同类系的状态结构(`ValueState`、`ListState` 等)有不同的限制` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969652 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -25,17 +25,16 @@ under the License. * ToC {:toc} -Apache Flink streaming applications are typically designed to run indefinitely or for long periods of time. -As with all long-running services, the applications need to be updated to adapt to changing requirements. -This goes the same for data schemas that the applications work against; they evolve along with the application. +Apache Flink 流应用通常被设计为永远或者长时间运行。 +与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。 +对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。 -This page provides an overview of how you can evolve your state type's data schema. -The current restrictions varies across different types and state structures (`ValueState`, `ListState`, etc.). +此页面概述了如何升级状态类型的数据结构。 +目前的限制因不同类型和状态结构而异(`ValueState`、`ListState` 等)。 Review comment: `目前对不同类系的状态结构(`ValueState`、`ListState` 等)有不同的限制` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969750 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -48,62 +47,52 @@ checkpointedState = getRuntimeContext().getListState(descriptor); {% endhighlight %} -Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write -persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly -supports it. This is handled transparently by serializers generated by Flink's type serialization framework -(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)). +在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。 +简而言之,状态数据结构只有在其序列化器正确支持时才能升级。 +这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl }}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。 -If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement -the serializer to support state schema evolution, please refer to -[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). -The documentation there also covers necessary internal details about the interplay between state serializers and Flink's -state backends to support state schema evolution. +如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器, +可以参考 [自定义状态序列化器]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 +本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。 -## Evolving state schema +## 升级状态数据结构 -To evolve the schema of a given state type, you would take the following steps: +为了对给定的状态类型进行升级,你需要采取以下几个步骤: - 1. Take a savepoint of your Flink streaming job. - 2. Update state types in your application (e.g., modifying your Avro type schema). - 3. Restore the job from the savepoint. When accessing state for the first time, Flink will assess whether or not - the schema had been changed for the state, and migrate state schema if necessary. + 1. 对 Flink 流作业进行 savepoint 操作。 + 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。 + 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。 -The process of migrating state to adapt to changed schemas happens automatically, and independently for each state. -This process is performed internally by Flink by first checking if the new serializer for the state has different -serialization schema than the previous serializer; if so, the previous serializer is used to read the state to objects, -and written back to bytes again with the new serializer. +用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。 +Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有, +那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。 -Further details about the migration process is out of the scope of this documentation; please refer to -[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). +更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 -## Supported data types for schema evolution +## 数据结构升级支持的数据类型 -Currently, schema evolution is supported only for POJO and Avro types. Therefore, if you care about schema evolution for -state, it is currently recommended to always use either Pojo or Avro for state data types. +目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。 +因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。 -There are plans to extend the support for more composite types; for more details, -please refer to [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896). +我们有计划支持更多的复合类型;更多的细节可以参考 [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。 -### POJO types +### POJO 类型 -Flink supports evolving schema of [POJO types]({{ site.baseurl }}/dev/types_serialization.html#rules-for-pojo-types), -based on the following set of rules: +Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl }}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级: - 1. Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints. - 2. New fields can be added. The new field will be initialized to the default value for its type, as -[defined by Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html). - 3. Declared fields types cannot change. - 4. Class name of the POJO type cannot change, including the namespace of the class. + 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。 Review comment: `可以删除字段` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@i
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969597 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -25,17 +25,16 @@ under the License. * ToC {:toc} -Apache Flink streaming applications are typically designed to run indefinitely or for long periods of time. -As with all long-running services, the applications need to be updated to adapt to changing requirements. -This goes the same for data schemas that the applications work against; they evolve along with the application. +Apache Flink 流应用通常被设计为永远或者长时间运行。 +与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。 +对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。 Review comment: I think this line should append after the previous line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969785 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -48,62 +47,52 @@ checkpointedState = getRuntimeContext().getListState(descriptor); {% endhighlight %} -Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write -persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly -supports it. This is handled transparently by serializers generated by Flink's type serialization framework -(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)). +在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。 +简而言之,状态数据结构只有在其序列化器正确支持时才能升级。 +这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl }}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。 -If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement -the serializer to support state schema evolution, please refer to -[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). -The documentation there also covers necessary internal details about the interplay between state serializers and Flink's -state backends to support state schema evolution. +如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器, +可以参考 [自定义状态序列化器]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 +本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。 -## Evolving state schema +## 升级状态数据结构 -To evolve the schema of a given state type, you would take the following steps: +为了对给定的状态类型进行升级,你需要采取以下几个步骤: - 1. Take a savepoint of your Flink streaming job. - 2. Update state types in your application (e.g., modifying your Avro type schema). - 3. Restore the job from the savepoint. When accessing state for the first time, Flink will assess whether or not - the schema had been changed for the state, and migrate state schema if necessary. + 1. 对 Flink 流作业进行 savepoint 操作。 + 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。 + 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。 -The process of migrating state to adapt to changed schemas happens automatically, and independently for each state. -This process is performed internally by Flink by first checking if the new serializer for the state has different -serialization schema than the previous serializer; if so, the previous serializer is used to read the state to objects, -and written back to bytes again with the new serializer. +用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。 +Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有, +那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。 -Further details about the migration process is out of the scope of this documentation; please refer to -[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). +更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 -## Supported data types for schema evolution +## 数据结构升级支持的数据类型 -Currently, schema evolution is supported only for POJO and Avro types. Therefore, if you care about schema evolution for -state, it is currently recommended to always use either Pojo or Avro for state data types. +目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。 +因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。 -There are plans to extend the support for more composite types; for more details, -please refer to [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896). +我们有计划支持更多的复合类型;更多的细节可以参考 [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。 -### POJO types +### POJO 类型 -Flink supports evolving schema of [POJO types]({{ site.baseurl }}/dev/types_serialization.html#rules-for-pojo-types), -based on the following set of rules: +Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl }}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级: - 1. Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints. - 2. New fields can be added. The new field will be initialized to the default value for its type, as -[defined by Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html). - 3. Declared fields types cannot change. - 4. Class name of the POJO type cannot change, including the namespace of the class. + 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。 + 2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。 + 3. 不可以修改字段的声明类型。 + 4. 不可以改变 POJO 类型的类名,包括类的命名空间。 -Note that the schema of POJO type state can only be evolved when restoring from a previous savepoint with Flink versions -newer than 1.8.0. When restoring
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969736 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -48,62 +47,52 @@ checkpointedState = getRuntimeContext().getListState(descriptor); {% endhighlight %} -Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write -persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly -supports it. This is handled transparently by serializers generated by Flink's type serialization framework -(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)). +在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。 +简而言之,状态数据结构只有在其序列化器正确支持时才能升级。 +这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl }}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。 -If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement -the serializer to support state schema evolution, please refer to -[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). -The documentation there also covers necessary internal details about the interplay between state serializers and Flink's -state backends to support state schema evolution. +如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器, +可以参考 [自定义状态序列化器]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 +本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。 -## Evolving state schema +## 升级状态数据结构 -To evolve the schema of a given state type, you would take the following steps: +为了对给定的状态类型进行升级,你需要采取以下几个步骤: - 1. Take a savepoint of your Flink streaming job. - 2. Update state types in your application (e.g., modifying your Avro type schema). - 3. Restore the job from the savepoint. When accessing state for the first time, Flink will assess whether or not - the schema had been changed for the state, and migrate state schema if necessary. + 1. 对 Flink 流作业进行 savepoint 操作。 + 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。 + 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。 -The process of migrating state to adapt to changed schemas happens automatically, and independently for each state. -This process is performed internally by Flink by first checking if the new serializer for the state has different -serialization schema than the previous serializer; if so, the previous serializer is used to read the state to objects, -and written back to bytes again with the new serializer. +用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。 +Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有, +那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。 -Further details about the migration process is out of the scope of this documentation; please refer to -[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). +更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 -## Supported data types for schema evolution +## 数据结构升级支持的数据类型 -Currently, schema evolution is supported only for POJO and Avro types. Therefore, if you care about schema evolution for -state, it is currently recommended to always use either Pojo or Avro for state data types. +目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。 +因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。 -There are plans to extend the support for more composite types; for more details, -please refer to [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896). +我们有计划支持更多的复合类型;更多的细节可以参考 [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。 -### POJO types +### POJO 类型 -Flink supports evolving schema of [POJO types]({{ site.baseurl }}/dev/types_serialization.html#rules-for-pojo-types), -based on the following set of rules: +Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl }}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级: - 1. Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints. - 2. New fields can be added. The new field will be initialized to the default value for its type, as -[defined by Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html). - 3. Declared fields types cannot change. - 4. Class name of the POJO type cannot change, including the namespace of the class. + 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。 + 2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。 + 3. 不可以修改字段的声明类型。 + 4. 不可以改变 POJO 类型的类名,包括类的命名空间。 -Note that the schema of POJO type state can only be evolved when restoring from a previous savepoint with Flink versions -newer than 1.8.0. When restoring
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969780 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -48,62 +47,52 @@ checkpointedState = getRuntimeContext().getListState(descriptor); {% endhighlight %} -Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write -persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly -supports it. This is handled transparently by serializers generated by Flink's type serialization framework -(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)). +在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。 +简而言之,状态数据结构只有在其序列化器正确支持时才能升级。 +这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl }}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。 -If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement -the serializer to support state schema evolution, please refer to -[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). -The documentation there also covers necessary internal details about the interplay between state serializers and Flink's -state backends to support state schema evolution. +如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器, +可以参考 [自定义状态序列化器]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 +本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。 -## Evolving state schema +## 升级状态数据结构 -To evolve the schema of a given state type, you would take the following steps: +为了对给定的状态类型进行升级,你需要采取以下几个步骤: - 1. Take a savepoint of your Flink streaming job. - 2. Update state types in your application (e.g., modifying your Avro type schema). - 3. Restore the job from the savepoint. When accessing state for the first time, Flink will assess whether or not - the schema had been changed for the state, and migrate state schema if necessary. + 1. 对 Flink 流作业进行 savepoint 操作。 + 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。 + 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。 -The process of migrating state to adapt to changed schemas happens automatically, and independently for each state. -This process is performed internally by Flink by first checking if the new serializer for the state has different -serialization schema than the previous serializer; if so, the previous serializer is used to read the state to objects, -and written back to bytes again with the new serializer. +用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。 +Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有, +那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。 -Further details about the migration process is out of the scope of this documentation; please refer to -[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). +更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 -## Supported data types for schema evolution +## 数据结构升级支持的数据类型 -Currently, schema evolution is supported only for POJO and Avro types. Therefore, if you care about schema evolution for -state, it is currently recommended to always use either Pojo or Avro for state data types. +目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。 +因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。 -There are plans to extend the support for more composite types; for more details, -please refer to [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896). +我们有计划支持更多的复合类型;更多的细节可以参考 [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。 -### POJO types +### POJO 类型 -Flink supports evolving schema of [POJO types]({{ site.baseurl }}/dev/types_serialization.html#rules-for-pojo-types), -based on the following set of rules: +Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl }}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级: - 1. Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints. - 2. New fields can be added. The new field will be initialized to the default value for its type, as -[defined by Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html). - 3. Declared fields types cannot change. - 4. Class name of the POJO type cannot change, including the namespace of the class. + 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。 + 2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。 + 3. 不可以修改字段的声明类型。 + 4. 不可以改变 POJO 类型的类名,包括类的命名空间。 -Note that the schema of POJO type state can only be evolved when restoring from a previous savepoint with Flink versions -newer than 1.8.0. When restoring
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969642 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -25,17 +25,16 @@ under the License. * ToC {:toc} -Apache Flink streaming applications are typically designed to run indefinitely or for long periods of time. -As with all long-running services, the applications need to be updated to adapt to changing requirements. -This goes the same for data schemas that the applications work against; they evolve along with the application. +Apache Flink 流应用通常被设计为永远或者长时间运行。 +与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。 +对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。 -This page provides an overview of how you can evolve your state type's data schema. -The current restrictions varies across different types and state structures (`ValueState`, `ListState`, etc.). +此页面概述了如何升级状态类型的数据结构。 Review comment: Maybe we should not translate `data schema`[1] [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969812 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -48,62 +47,52 @@ checkpointedState = getRuntimeContext().getListState(descriptor); {% endhighlight %} -Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write -persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly -supports it. This is handled transparently by serializers generated by Flink's type serialization framework -(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)). +在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。 +简而言之,状态数据结构只有在其序列化器正确支持时才能升级。 +这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl }}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。 -If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement -the serializer to support state schema evolution, please refer to -[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). -The documentation there also covers necessary internal details about the interplay between state serializers and Flink's -state backends to support state schema evolution. +如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器, +可以参考 [自定义状态序列化器]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 +本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。 -## Evolving state schema +## 升级状态数据结构 -To evolve the schema of a given state type, you would take the following steps: +为了对给定的状态类型进行升级,你需要采取以下几个步骤: - 1. Take a savepoint of your Flink streaming job. - 2. Update state types in your application (e.g., modifying your Avro type schema). - 3. Restore the job from the savepoint. When accessing state for the first time, Flink will assess whether or not - the schema had been changed for the state, and migrate state schema if necessary. + 1. 对 Flink 流作业进行 savepoint 操作。 + 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。 + 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。 -The process of migrating state to adapt to changed schemas happens automatically, and independently for each state. -This process is performed internally by Flink by first checking if the new serializer for the state has different -serialization schema than the previous serializer; if so, the previous serializer is used to read the state to objects, -and written back to bytes again with the new serializer. +用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。 +Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有, +那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。 -Further details about the migration process is out of the scope of this documentation; please refer to -[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). +更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 -## Supported data types for schema evolution +## 数据结构升级支持的数据类型 -Currently, schema evolution is supported only for POJO and Avro types. Therefore, if you care about schema evolution for -state, it is currently recommended to always use either Pojo or Avro for state data types. +目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。 Review comment: `目前,仅支持 POJO 和 Avro 类型的 schema 升级` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969840 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -48,62 +47,52 @@ checkpointedState = getRuntimeContext().getListState(descriptor); {% endhighlight %} -Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write -persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly -supports it. This is handled transparently by serializers generated by Flink's type serialization framework -(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)). +在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。 +简而言之,状态数据结构只有在其序列化器正确支持时才能升级。 +这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl }}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。 -If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement -the serializer to support state schema evolution, please refer to -[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). -The documentation there also covers necessary internal details about the interplay between state serializers and Flink's -state backends to support state schema evolution. +如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器, +可以参考 [自定义状态序列化器]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 +本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。 -## Evolving state schema +## 升级状态数据结构 -To evolve the schema of a given state type, you would take the following steps: +为了对给定的状态类型进行升级,你需要采取以下几个步骤: - 1. Take a savepoint of your Flink streaming job. - 2. Update state types in your application (e.g., modifying your Avro type schema). - 3. Restore the job from the savepoint. When accessing state for the first time, Flink will assess whether or not - the schema had been changed for the state, and migrate state schema if necessary. + 1. 对 Flink 流作业进行 savepoint 操作。 + 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。 + 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。 Review comment: `从 savepoint 处重启作业` --> `从 savepoint 恢复作业` `当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。` --> `当第一次访问状态数据时,Flink 会判断状态数据 schema 是否已经改变,并进行必要的迁移。` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r280969524 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -25,17 +25,16 @@ under the License. * ToC {:toc} -Apache Flink streaming applications are typically designed to run indefinitely or for long periods of time. -As with all long-running services, the applications need to be updated to adapt to changing requirements. -This goes the same for data schemas that the applications work against; they evolve along with the application. +Apache Flink 流应用通常被设计为永远或者长时间运行。 +与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。 Review comment: I think this sentence `与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。`can be improved. Something like `与所有长期运行的服务一样,应用程序需要随着业务的迭代而进行调整。应用所处理的数据 schema 也会随着进行变化`, or you can get something better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
bowenli86 commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337#discussion_r280969694 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; + +import org.apache.hadoop.hive.metastore.api.Database; + +/** + * Utils to convert meta objects between Flink and Hive for HiveCatalog. + */ +public class HiveCatalogUtil { + + private HiveCatalogUtil() { + } + + // -- Utils -- + + /** +* Creates a Hive database from CatalogDatabase. +*/ + public static Database createHiveDatabase(String dbName, CatalogDatabase db) { + HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) db; + + return new Database( Review comment: I got confused between 'description' and 'comment'. I've created https://github.com/apache/flink/pull/8340 to complete the javadoc for description. Will address "comment" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
bowenli86 commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337#discussion_r280969694 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; + +import org.apache.hadoop.hive.metastore.api.Database; + +/** + * Utils to convert meta objects between Flink and Hive for HiveCatalog. + */ +public class HiveCatalogUtil { + + private HiveCatalogUtil() { + } + + // -- Utils -- + + /** +* Creates a Hive database from CatalogDatabase. +*/ + public static Database createHiveDatabase(String dbName, CatalogDatabase db) { + HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) db; + + return new Database( Review comment: I got confused between 'description' and 'comment'. I've createdhttps://github.com/apache/flink/pull/8340 to complete the javadoc for description. Will address "comment" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces
flinkbot commented on issue #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces URL: https://github.com/apache/flink/pull/8340#issuecomment-489295083 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces
bowenli86 commented on issue #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces URL: https://github.com/apache/flink/pull/8340#issuecomment-489295088 cc @xuefuz This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12395) Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces
[ https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12395: --- Labels: pull-request-available (was: ) > Add more detailed javadoc for getDescription() and getDetailedDescription() > in catalog object interfaces > > > Key: FLINK-12395 > URL: https://issues.apache.org/jira/browse/FLINK-12395 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > We need to add more javadoc for {{getDescritpion()}} and > {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which > cases they will be called, and how we expect developers to implement them -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 opened a new pull request #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces
bowenli86 opened a new pull request #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces URL: https://github.com/apache/flink/pull/8340 ## What is the purpose of the change This PR adds more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces, to help developers better understand when these APIs are expected to be called. ## Brief change log - Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12395) Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces
[ https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12395: - Summary: Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces (was: Document description and detailed description in catalog object interfaces) > Add more detailed javadoc for getDescription() and getDetailedDescription() > in catalog object interfaces > > > Key: FLINK-12395 > URL: https://issues.apache.org/jira/browse/FLINK-12395 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > We need to add more javadoc for {{getDescritpion()}} and > {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which > cases they will be called, and how we expect developers to implement them -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12395) Document description and detailed description in catalog object interfaces
[ https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12395: - Summary: Document description and detailed description in catalog object interfaces (was: Document description and detailed description in CatalogBaseTable) > Document description and detailed description in catalog object interfaces > -- > > Key: FLINK-12395 > URL: https://issues.apache.org/jira/browse/FLINK-12395 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > We need to add more javadoc for {{getDescritpion()}} and > {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which > cases they will be called, and how we expect developers to implement them -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12395) Document description and detailed description in catalog object interfaces
[ https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-12395: Assignee: Bowen Li > Document description and detailed description in catalog object interfaces > -- > > Key: FLINK-12395 > URL: https://issues.apache.org/jira/browse/FLINK-12395 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > We need to add more javadoc for {{getDescritpion()}} and > {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which > cases they will be called, and how we expect developers to implement them -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337#discussion_r280969214 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; + +import org.apache.hadoop.hive.metastore.api.Database; + +/** + * Utils to convert meta objects between Flink and Hive for HiveCatalog. + */ +public class HiveCatalogUtil { + + private HiveCatalogUtil() { + } + + // -- Utils -- + + /** +* Creates a Hive database from CatalogDatabase. +*/ + public static Database createHiveDatabase(String dbName, CatalogDatabase db) { + HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) db; + + return new Database( Review comment: Just curious, is there any use for "comment"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337#discussion_r280969532 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * A catalog implementation for Hive. + */ +public class HiveCatalog extends HiveCatalogBase { + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + super(catalogName, hivemetastoreURI); + + LOG.info("Created HiveCatalog '{}'", catalogName); + } + + public HiveCatalog(String catalogName, HiveConf hiveConf) { + super(catalogName, hiveConf); + + LOG.info("Created HiveCatalog '{}'", catalogName); + } + + // -- databases -- + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + Database hiveDb; + + try { + hiveDb = client.getDatabase(databaseName); + } catch (NoSuchObjectException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get database %s from %s", databaseName, catalogName), e); + } + + return new HiveCatalogDatabase( + hiveDb.getParameters(), hiveDb.getLocationUri(), hiveDb.getDescription()); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + try { + client.createDatabase(HiveCatalogUtil.createHiveDatabase(name, database)); Review comment: I think the logics related to hive metastore client can be shared. For example, createDatabase() only needs a hive database object, which might be created differently in the two catalogs. However, once such a object is created, the remaining logic is the same and can be shared. This is an automated message from the Apache Git
[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337#discussion_r280969550 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * A catalog implementation for Hive. + */ +public class HiveCatalog extends HiveCatalogBase { + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + super(catalogName, hivemetastoreURI); + + LOG.info("Created HiveCatalog '{}'", catalogName); + } + + public HiveCatalog(String catalogName, HiveConf hiveConf) { + super(catalogName, hiveConf); + + LOG.info("Created HiveCatalog '{}'", catalogName); + } + + // -- databases -- + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + Database hiveDb; + + try { + hiveDb = client.getDatabase(databaseName); + } catch (NoSuchObjectException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get database %s from %s", databaseName, catalogName), e); + } + + return new HiveCatalogDatabase( + hiveDb.getParameters(), hiveDb.getLocationUri(), hiveDb.getDescription()); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + try { + client.createDatabase(HiveCatalogUtil.createHiveDatabase(name, database)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(catalogName, name); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create database %s", name), e); + } + } + + @Override +
[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337#discussion_r280968990 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A hive catalog database implementation. + */ +public class HiveCatalogDatabase implements CatalogDatabase { + // Property of the database + private final Map properties; + // HDFS path of the database + private String location; + // Comment of the database + private String comment = "This is a generic catalog database."; Review comment: This shouldn't be a "generic" catalog, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337#discussion_r280969275 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.util.StringUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A hive catalog database implementation. + */ +public class HiveCatalogDatabase implements CatalogDatabase { + // Property of the database + private final Map properties; + // HDFS path of the database + private String location; + // Comment of the database + private String comment = "This is a generic catalog database."; + + public HiveCatalogDatabase(Map properties) { Review comment: In other meta classes, properties are optional, where here it's required. Could we make it consistent? If all three variables are optional, we should have a default constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337#discussion_r280969459 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * A catalog implementation for Hive. + */ +public class HiveCatalog extends HiveCatalogBase { + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + super(catalogName, hivemetastoreURI); + + LOG.info("Created HiveCatalog '{}'", catalogName); + } + + public HiveCatalog(String catalogName, HiveConf hiveConf) { + super(catalogName, hiveConf); + + LOG.info("Created HiveCatalog '{}'", catalogName); + } + + // -- databases -- + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + Database hiveDb; + + try { Review comment: Maybe we should extract this part to the base class, as the logic is the same for generic and hive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12395) Document description and detailed description in CatalogBaseTable
[ https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832986#comment-16832986 ] Bowen Li edited comment on FLINK-12395 at 5/4/19 5:05 AM: -- Agree to improve the javadoc, as I've also been confused... I'll add javadoc for in which cases they are expected to be called. was (Author: phoenixjiangnan): Agree to improve the javadoc, as I've also been confused... I'll add javadoc for in which cases they are expected to be called. I will put "comment" into table properties map > Document description and detailed description in CatalogBaseTable > - > > Key: FLINK-12395 > URL: https://issues.apache.org/jira/browse/FLINK-12395 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > We need to add more javadoc for {{getDescritpion()}} and > {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which > cases they will be called, and how we expect developers to implement them -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11637) Translate "Checkpoints" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832987#comment-16832987 ] Congxian Qiu(klion26) commented on FLINK-11637: --- [~lsy] I'm not working on this now, if you want to contribute, please feel free to assign it to yourself. > Translate "Checkpoints" page into Chinese > - > > Key: FLINK-11637 > URL: https://issues.apache.org/jira/browse/FLINK-11637 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Priority: Major > > doc locates in flink/docs/ops/state/checkpoints.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12395) Document description and detailed description in CatalogBaseTable
[ https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12395: - Description: We need to add more javadoc for {{getDescritpion()}} and {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which cases they will be called, and how we expect developers to implement them (was: CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} API, which don't seem to make such sense. I'm not sure what's the use case of detailed description, and how should users specify it in SQL and table api. Probably should remove detailed description and rename "description" to "comment". Besides, for simplicity, we should consider just treating "comment" as a property in properties map. ) > Document description and detailed description in CatalogBaseTable > - > > Key: FLINK-12395 > URL: https://issues.apache.org/jira/browse/FLINK-12395 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > We need to add more javadoc for {{getDescritpion()}} and > {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which > cases they will be called, and how we expect developers to implement them -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12395) Reconcile description and detailed description in CatalogBaseTable
[ https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832986#comment-16832986 ] Bowen Li commented on FLINK-12395: -- Agree to improve the javadoc, as I've also been confused... I'll add javadoc for in which cases they are expected to be called. I will put "comment" into table properties map > Reconcile description and detailed description in CatalogBaseTable > -- > > Key: FLINK-12395 > URL: https://issues.apache.org/jira/browse/FLINK-12395 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} > API, which don't seem to make such sense. I'm not sure what's the use case of > detailed description, and how should users specify it in SQL and table api. > Probably should remove detailed description and rename "description" to > "comment". > Besides, for simplicity, we should consider just treating "comment" as a > property in properties map. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12395) Document description and detailed description in CatalogBaseTable
[ https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12395: - Summary: Document description and detailed description in CatalogBaseTable (was: Reconcile description and detailed description in CatalogBaseTable) > Document description and detailed description in CatalogBaseTable > - > > Key: FLINK-12395 > URL: https://issues.apache.org/jira/browse/FLINK-12395 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} > API, which don't seem to make such sense. I'm not sure what's the use case of > detailed description, and how should users specify it in SQL and table api. > Probably should remove detailed description and rename "description" to > "comment". > Besides, for simplicity, we should consider just treating "comment" as a > property in properties map. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12240) Support view related operations in GenericHiveMetastoreCatalog
[ https://issues.apache.org/jira/browse/FLINK-12240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12240: --- Labels: pull-request-available (was: ) > Support view related operations in GenericHiveMetastoreCatalog > -- > > Key: FLINK-12240 > URL: https://issues.apache.org/jira/browse/FLINK-12240 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > Support view related operations in GenericHiveMetastoreCatalog, which > implements ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog
flinkbot commented on issue #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8339#issuecomment-489287613 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog
bowenli86 opened a new pull request #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8339 ## What is the purpose of the change This PR added support for views in GenericHiveMetastoreCatalog, operations include create, drop, alter, rename, list. ## Brief change log - Implemented view related APIs in GenericHiveMetastoreCatalog - Reused and moved view related unit tests from GenericInMemoryCatalog to CatalogTestBase ## Verifying this change This change added tests and can be verified as follows: - Tests moved to CatalogTestBase ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This PR depends on https://github.com/apache/flink/pull/8329 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8338: [BP-1.6][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators
flinkbot commented on issue #8338: [BP-1.6][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators URL: https://github.com/apache/flink/pull/8338#issuecomment-489284633 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 opened a new pull request #8338: [BP-1.6][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators
klion26 opened a new pull request #8338: [BP-1.6][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators URL: https://github.com/apache/flink/pull/8338 ## What is the purpose of the change cherry-pick #8263 to release-1.6 manually. ## Verifying this change This change added tests and can be verified as follows: `flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java#testMultipleStatefulOperatorChainedSnapshotAndRestore` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12399) FilterableTableSource does not use filters on job run
Josh Bradt created FLINK-12399: -- Summary: FilterableTableSource does not use filters on job run Key: FLINK-12399 URL: https://issues.apache.org/jira/browse/FLINK-12399 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.8.0 Reporter: Josh Bradt Attachments: flink-filter-bug.tar.gz As discussed [on the mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], there appears to be a bug where a job that uses a custom FilterableTableSource does not keep the filters that were pushed down into the table source. More specifically, the table source does receive filters via applyPredicates, and a new table source with those filters is returned, but the final job graph appears to use the original table source, which does not contain any filters. I attached a minimal example program to this ticket. The custom table source is as follows: {code:java} public class CustomTableSource implements BatchTableSource, FilterableTableSource { private static final Logger LOG = LoggerFactory.getLogger(CustomTableSource.class); private final Filter[] filters; private final FilterConverter converter = new FilterConverter(); public CustomTableSource() { this(null); } private CustomTableSource(Filter[] filters) { this.filters = filters; } @Override public DataSet getDataSet(ExecutionEnvironment execEnv) { if (filters == null) { LOG.info(" No filters defined "); } else { LOG.info(" Found filters "); for (Filter filter : filters) { LOG.info("FILTER: {}", filter); } } return execEnv.fromCollection(allModels()); } @Override public TableSource applyPredicate(List predicates) { LOG.info("Applying predicates"); List acceptedFilters = new ArrayList<>(); for (final Expression predicate : predicates) { converter.convert(predicate).ifPresent(acceptedFilters::add); } return new CustomTableSource(acceptedFilters.toArray(new Filter[0])); } @Override public boolean isFilterPushedDown() { return filters != null; } @Override public TypeInformation getReturnType() { return TypeInformation.of(Model.class); } @Override public TableSchema getTableSchema() { return TableSchema.fromTypeInfo(getReturnType()); } private List allModels() { List models = new ArrayList<>(); models.add(new Model(1, 2, 3, 4)); models.add(new Model(10, 11, 12, 13)); models.add(new Model(20, 21, 22, 23)); return models; } } {code} When run, it logs {noformat} 15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates 15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates 15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates 15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource - No filters defined {noformat} which appears to indicate that although filters are getting pushed down, the final job does not use them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12398) Support partitioned view in catalog API
Bowen Li created FLINK-12398: Summary: Support partitioned view in catalog API Key: FLINK-12398 URL: https://issues.apache.org/jira/browse/FLINK-12398 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Partitioned view is not a rare thing in common databases: SQL Server: https://docs.microsoft.com/en-us/sql/t-sql/statements/create-view-transact-sql?view=sql-server-2017#partitioned-views Oracle: https://docs.oracle.com/cd/A57673_01/DOC/server/doc/A48506/partview.htm Hive: https://cwiki.apache.org/confluence/display/Hive/PartitionedViews The work may include moving {{isPartitioend()}} and {{getPartitionKeys}} from {{CatalogTable}} to {{CatalogBaseTable}}, and other changes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12397) Drop flink-shaded-asm-5
Chesnay Schepler created FLINK-12397: Summary: Drop flink-shaded-asm-5 Key: FLINK-12397 URL: https://issues.apache.org/jira/browse/FLINK-12397 Project: Flink Issue Type: Improvement Components: BuildSystem / Shaded Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: shaded-7.0 With FLINK-12390 being resolved there's no need to further maintain flink-shaded-asm-5, so let's remove it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12390) Fully migrate to asm6
[ https://issues.apache.org/jira/browse/FLINK-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-12390. Resolution: Fixed master: 9975e0393a9e09dbde21ca61f1a5751cc5934411 > Fully migrate to asm6 > - > > Key: FLINK-12390 > URL: https://issues.apache.org/jira/browse/FLINK-12390 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > We currently use a mix of asm5/asm6, let's migrate completely so we can drop > a dependency and remove a module from flink-shaded. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12396) KafkaITCase.testOneSourceMultiplePartitions doesn't fail properly
Bowen Li created FLINK-12396: Summary: KafkaITCase.testOneSourceMultiplePartitions doesn't fail properly Key: FLINK-12396 URL: https://issues.apache.org/jira/browse/FLINK-12396 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.9.0 Reporter: Bowen Li Fix For: 1.9.0 https://api.travis-ci.org/v3/job/527599974/log.txt In the log, we can see that KafkaITCase.testOneSourceMultiplePartitions failed, but it kept running and doing all the snapshot, which caused the built to timeout. {code:java} 05:00:38,896 INFO org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink - Snapshot of counter 4800 at checkpoint 115 05:00:39,050 ERROR org.apache.flink.streaming.connectors.kafka.KafkaITCase - Test testOneSourceMultiplePartitions(org.apache.flink.streaming.connectors.kafka.KafkaITCase) failed with: org.junit.runners.model.TestTimedOutException: test timed out after 6 milliseconds at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621) at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79) at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35) at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneSourceMultiplePartitionsExactlyOnceTest(KafkaConsumerTestBase.java:924) at org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneSourceMultiplePartitions(KafkaITCase.java:102) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) 05:00:39,057 INFO org.apache.flink.streaming.connectors.kafka.KafkaITCase - Test testCancelingFullTopic(org.apache.flink.streaming.connectors.kafka.KafkaITCase) is running. 05:00:39,396 INFO org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink - Snapshot of counter 4800 at checkpoint 116 05:00:39,896 INFO org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink - Snapshot of counter 4800 at checkpoint 117 05:00:40,396 INFO org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink - Snapshot of counter 4800 at checkpoint 118 05:00:40,896 INFO org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink - Snapshot of counter 4800 at checkpoint 119 05:00:41,396 INFO org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink - Snapshot of counter 4800 at checkpoint 120 {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol merged pull request #8332: [FLINK-12390][build] Fully migrate to flink-shaded-asm-6
zentol merged pull request #8332: [FLINK-12390][build] Fully migrate to flink-shaded-asm-6 URL: https://github.com/apache/flink/pull/8332 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on issue #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#issuecomment-489180854 Thanks @dawidwys for the feedback! Let's sync up on Monday offline as planned This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r280866583 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) { rels.map(_.asInstanceOf[ExecNode[_, _]]) } + /** +* Register an [[ReadableCatalog]] under a unique name. +* +* @param name the name under which the catalog will be registered +* @param catalog the catalog to register +* @throws CatalogAlreadyExistsException thrown if the catalog already exists +*/ + @throws[CatalogAlreadyExistsException] + def registerCatalog(name: String, catalog: ReadableCatalog): Unit = { +catalogManager.registerCatalog(name, catalog) + } + + /** +* Get a registered [[ReadableCatalog]]. +* +* @param catalogName name of the catalog to get +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + @throws[CatalogNotExistException] + def getCatalog(catalogName: String): ReadableCatalog = { +catalogManager.getCatalog(catalogName) + } + + /** +* Get the current catalog. +* +* @return the current catalog in CatalogManager +*/ + def getCurrentCatalog(): ReadableCatalog = { +catalogManager.getCurrentCatalog + } + + /** +* Get the current database name. +* +* @return the current database of the current catalog +*/ + def getCurrentDatabaseName(): String = { +catalogManager.getCurrentCatalog.getCurrentDatabase + } + + /** +* Set the current catalog. +* +* @param name name of the catalog to set as current catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + @throws[CatalogNotExistException] + def setCurrentCatalog(name: String): Unit = { +catalogManager.setCurrentCatalog(name) + } + + /** +* Set the current catalog and current database. +* +* @param catalogName name of the catalog to set as current catalog +* @param databaseName name of the database to set as current database +* @throws CatalogNotExistException thrown if the catalog doesn't exist +* @throws DatabaseNotExistException thrown if the database doesn't exist +*/ + @throws[CatalogNotExistException] + @throws[DatabaseNotExistException] + def setCurrentDatabase(catalogName: String, databaseName: String): Unit = { +catalogManager.setCurrentCatalog(catalogName) +catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName) Review comment: I agree the "current database" thing is now confusing because it actually presents two things: 1) the database of the session if its catalog is current; 2) which will be the current db when users switch to its catalog, it will be the "default_db" in each catalog if users don't overwrite it, that's why CatalogManager needs to keep a map if we wanna extract it out. We separated these two concepts In Blink, and we probably do that too in Flink by replacing `get/setCurrentDatabase` with `String getDefaultDatabase()` (if that's way, we should do it in a different JIRA/PR). IMHO, this is not important at this moment, as it's only a small usability thing from users' perspective. I can remove parts related to `setCurrentDatabase` from this PR, and we can bake it for more time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r280858643 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.util.StringUtils; + +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.schema.SchemaPlus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager implementation for Flink. + * TODO: [FLINK-11275] Decouple CatalogManager with Calcite + * Idealy FlinkCatalogManager should be in flink-table-api-java module. + * But due to that it currently depends on Calcite, a dependency that flink-table-api-java doesn't have right now. + * We temporarily put FlinkCatalogManager in flink-table-planner-blink. + */ +public class FlinkCatalogManager implements CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalogManager.class); + + public static final String BUILTIN_CATALOG_NAME = "builtin"; + + // The catalog to hold all registered and translated tables + // We disable caching here to prevent side effects + private CalciteSchema internalSchema = CalciteSchema.createRootSchema(false, false); + private SchemaPlus rootSchema = internalSchema.plus(); + + // A map between names and catalogs. + private Map catalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + public FlinkCatalogManager() { + LOG.info("Initializing FlinkCatalogManager"); + catalogs = new HashMap<>(); + + GenericInMemoryCatalog inMemoryCatalog = new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME); Review comment: I think access control/authorization is orthogonal to this problem. I'd argue that we probably should allow some types of catalog to be readable only, and we just don't enter data into them but only read data from them. With the example of Confluent Registry, it has really nice features such as fancy UI, schema evolution, compatibility check, etc. If users are already using that to manage their kafka schemas, why would they wanna use something else that don't support all these features or not natively? Support writing in Flink's Registry Catalog would be vain IMHO. The `registerTable` is a part of `TableEnvironment` in the example. With the use case above in mind, having only a`Catalog` interface and letting a catalog implement 13+ methods (the # can grow) that it doesn't support seem to be not a good design. I wonder whether we should add a `CatalogManager#getReadableWritableCatalog()` API, which can perform checks in side and `TableEnvironment` won't worry about checks anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on issue #8332: [FLINK-12390][build] Fully migrate to flink-shaded-asm-6
GJL commented on issue #8332: [FLINK-12390][build] Fully migrate to flink-shaded-asm-6 URL: https://github.com/apache/flink/pull/8332#issuecomment-489154787 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on a change in pull request #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation
sjwiesman commented on a change in pull request #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation URL: https://github.com/apache/flink/pull/8326#discussion_r280830610 ## File path: docs/ops/deployment/aws.md ## @@ -64,81 +64,11 @@ HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/ {% top %} -## S3: Simple Storage Service - -[Amazon Simple Storage Service](http://aws.amazon.com/s3/) (Amazon S3) provides cloud object storage for a variety of use cases. You can use S3 with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl}}/ops/state/state_backends.html) or even as a YARN object storage. - -You can use S3 objects like regular files by specifying paths in the following format: - -{% highlight plain %} -s3:/// -{% endhighlight %} - -The endpoint can either be a single file or a directory, for example: - -{% highlight java %} -// Read from S3 bucket -env.readTextFile("s3:///"); - -// Write to S3 bucket -stream.writeAsText("s3:///"); - -// Use S3 as FsStatebackend -env.setStateBackend(new FsStateBackend("s3:///")); -{% endhighlight %} - -Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI. - -For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 -filesystem wrappers which are fairly easy to set up. For some cases, however, e.g. for using S3 as -YARN's resource storage dir, it may be necessary to set up a specific Hadoop S3 FileSystem -implementation. Both ways are described below. - -### Shaded Hadoop/Presto S3 file systems (recommended) - -{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %} - -To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the -`opt` directory to the `lib` directory of your Flink distribution before starting Flink, e.g. - -{% highlight bash %} -cp ./opt/flink-s3-fs-presto-{{ site.version }}.jar ./lib/ -{% endhighlight %} - -Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem -wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers -for `s3a://` and `flink-s3-fs-presto` also registers for `s3p://`, so you can -use this to use both at the same time. - - Configure Access Credentials - -After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets. - -# Identity and Access Management (IAM) (Recommended) - -The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html). - -If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. - -# Access Keys (Discouraged) - -Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). - -You need to configure both `s3.access-key` and `s3.secret-key` in Flink's `flink-conf.yaml`: - -{% highlight yaml %} -s3.access-key: your-access-key -s3.secret-key: your-secret-key -{% endhighlight %} - -{% top %} - -### Hadoop-provided S3 file systems - manual setup +### Hadoop-provided S3 file systems {% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %} -This setup is a bit more complex and we recommend using our shaded Hadoop/Presto file systems -instead (see above) unless required otherwise, e.g. for using S3 as YARN's resource storage dir +Apache Flink provides native [S3 FileSystem's](../filesystems/s3.html) out of the box and we recomend using them unless required otherwise, e.g. for using S3 as YARN's resource storage dir Review comment: I think that's reasonable. I believe most users are using the built-in S3 filesystems at this point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries a
[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#issuecomment-489140860 Ok I think I didn't catch the issue as I was using mvn 3.5 and it seems that the shade plugin behaves a bit differently on it. When I build with 3.2.5 (what we default to in the project), I was able to trigger the failure. Seems to be due to the flink-s3-fs-base module filtering all the classes in the util package (where I had placed the HadoopConfigLoader). I'm not really sure on the context of the original change, I've updated to only filter the HadoopUtils class (no other classes in that package). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280777174 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); +
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280761093 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); + } + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { Review comment: We discussed in Slack about using state transitions but did not commit to this decision yet. I think we should continue relying on `onPartitionConsumable` for now. As @shuai-xu already mentioned in Slack, there may be operators that take a very long time until they produce a result, e.g., aggregations, sorting, etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280771723 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); +
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280769132 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { Review comment: This check should not be necessary. The call `schedulerOperations.allocateSlotsAndDeploy()` should also work if the input collection is empty. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280761548 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); +
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280713033 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { Review comment: If the input set of `ExecutionVertexIDs` contains all execution vertices, this strategy degrades to the `EagerSchedulingStrategy`. Conceptually there is no difference between `restartTasks` and `startScheduling`. Both methods iterate over a set of vertices and schedule those whose input dependencies are fulfilled. The difference is that in `rest
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280786571 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + JobVertex jobVertex1 = new JobVertex("vertex#1"); + JobVertex jobVertex2 = new JobVertex("vertex#2"); + JobGraph graph = new JobGraph(jobVertex1, jobVertex2); + jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + + for (int i = 0; i < 3; i++) { + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex1.getID(), i)); + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex2.getID(), i)); + } + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + graph); + + schedulingStrategy.startScheduling(); + + assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + } + + /** +* Tests that when on execution state change will start available downstream vertices. +* vertex#0vertex#1 +* \ / +*\ / +* \ / +* (BLOCKING, ALL) +* vertex#3 vertex#2 +* \/ +*\ / +* \/ +* (BLOCKING, ANY) +* vertex#4 +*| +*| +*| +*(PIPELINED) +* vertex#5 +*/ + @Test + public void testOnExecutionStateChange() { + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); Review comment: I think this method is a bit too long. It could be split into more units (see Clean Code by Robert C Martin, Chapter 3). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280787008 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + JobVertex jobVertex1 = new JobVertex("vertex#1"); + JobVertex jobVertex2 = new JobVertex("vertex#2"); + JobGraph graph = new JobGraph(jobVertex1, jobVertex2); + jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + + for (int i = 0; i < 3; i++) { + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex1.getID(), i)); + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex2.getID(), i)); + } + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + graph); + + schedulingStrategy.startScheduling(); + + assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + } + + /** +* Tests that when on execution state change will start available downstream vertices. +* vertex#0vertex#1 +* \ / +*\ / +* \ / +* (BLOCKING, ALL) +* vertex#3 vertex#2 +* \/ +*\ / +* \/ +* (BLOCKING, ANY) +* vertex#4 +*| +*| +*| +*(PIPELINED) +* vertex#5 +*/ + @Test + public void testOnExecutionStateChange() { + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + JobVertex[] jobVertices = new JobVertex[6]; + + for (int i = 0; i < 6; i++) { + jobVertices[i] = new JobVertex("vertex#" + i); + } + + jobVertices[3].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[3].connectNewDataSetAsInput(jobVertices[1], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[4].connectNewDataSetAsInput(jobVertices[3], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); +
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280713503 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { Review comment: I don't think accessing the `jobGraph` is necessary here. Isn't this the same as `schedulingVertex.getConsumedResultPartitions().isEmpty()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280795534 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + JobVertex jobVertex1 = new JobVertex("vertex#1"); + JobVertex jobVertex2 = new JobVertex("vertex#2"); + JobGraph graph = new JobGraph(jobVertex1, jobVertex2); + jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + + for (int i = 0; i < 3; i++) { + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex1.getID(), i)); + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex2.getID(), i)); + } + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + graph); + + schedulingStrategy.startScheduling(); + + assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + } + + /** +* Tests that when on execution state change will start available downstream vertices. +* vertex#0vertex#1 +* \ / +*\ / +* \ / +* (BLOCKING, ALL) +* vertex#3 vertex#2 +* \/ +*\ / +* \/ +* (BLOCKING, ANY) +* vertex#4 +*| +*| +*| +*(PIPELINED) +* vertex#5 +*/ + @Test + public void testOnExecutionStateChange() { + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + JobVertex[] jobVertices = new JobVertex[6]; + + for (int i = 0; i < 6; i++) { + jobVertices[i] = new JobVertex("vertex#" + i); + } + + jobVertices[3].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[3].connectNewDataSetAsInput(jobVertices[1], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[4].connectNewDataSetAsInput(jobVertices[3], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); +
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280766308 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); +
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280769682 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { Review comment: Returning `null` should be avoided. All `SchedulingVertex` implementations should be required to return an empty collection instead. The null check can be removed then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280777174 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); +
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280762347 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Collection; + +/** + * Simple implementation of {@link SchedulingResultPartition} for testing. + */ +public class TestingSchedulingResultPartition implements SchedulingResultPartition { + private final IntermediateDataSetID intermediateDataSetID; + + private final IntermediateResultPartitionID intermediateResultPartitionID; + + private final ResultPartitionType partitionType; + + private final SchedulingVertex producer; + + private Collection consumers; + + TestingSchedulingResultPartition(IntermediateDataSetID dataSetID, + IntermediateResultPartitionID partitionID, ResultPartitionType type, SchedulingVertex producer, + Collection consumers) { + this.intermediateDataSetID = dataSetID; + this.intermediateResultPartitionID = partitionID; + this.partitionType = type; + this.producer = producer; + this.consumers = consumers; + } + + @Override + public IntermediateResultPartitionID getId() { + return intermediateResultPartitionID; + } + + @Override + public IntermediateDataSetID getResultId() { + return intermediateDataSetID; + } + + @Override + public ResultPartitionType getPartitionType() { + return partitionType; + } + + @Override + public ResultPartitionState getState() { + return ResultPartitionState.DONE; + } + + @Override + public SchedulingVertex getProducer() { + return producer; + } + + @Override + public Collection getConsumers() { + return consumers; + } + + public void setConsumers(Collection consumers) { + this.consumers = consumers; Review comment: I think that `null` should not be allowed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280762928 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Collection; + +/** + * Simple implementation of {@link SchedulingResultPartition} for testing. + */ +public class TestingSchedulingResultPartition implements SchedulingResultPartition { + private final IntermediateDataSetID intermediateDataSetID; + + private final IntermediateResultPartitionID intermediateResultPartitionID; + + private final ResultPartitionType partitionType; + + private final SchedulingVertex producer; + + private Collection consumers; Review comment: This field can be initialized with the constant `Collections.emptyList()`. The getter `getConsumers()` should never return `null`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280761093 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); + } + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { Review comment: We discussed in Slack using state transitions but did not commit to this decision yet. I think we should continue relying on `onPartitionConsumable` for now. As @shuai-xu already mentioned in Slack, there may be operators that take a very long time until they produce a result, e.g., aggregations, sorting, etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With r
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280703833 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); +
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280777824 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); +
[GitHub] [flink] twalthr commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
twalthr commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280798765 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of a time WITHOUT timezone consisting of {@code hour:minute:second[.fractional]} with up Review comment: The SQL standard also defines a `TIME WITH TIME ZONE`. I will explicitly mention that we won't support that. Java does also not provide a time class for such a type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r280775926 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); +
[GitHub] [flink] sjwiesman commented on issue #8330: [FLINK-12388][docs] Update the production readiness checklist
sjwiesman commented on issue #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330#issuecomment-489107895 Thanks for the review, I've updated the JobManager section to be more clear and also fixed a few other grammar issues I noticed on the second pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen edited a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
StephanEwen edited a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8280#issuecomment-489105632 This PR does a few things at the same time: 1. It recurses into fields, cleaning them. 2. It skips closure cleaning of functions that have a `writeObject` method or are `Externalizable`. 3. It adds checks to skip classes that are not inner classes. Recursion into non-transient and non-static fields probably solves the "wrapping function" problem, but it does much more than that. While this can be a good addition, we definitely need a way to turn this off, so we need to extend the closure cleaner configuration to support "don't clean", "clean", and "clean recursive". I think that (2) and (3) are good additions, but the check should probably be at the top level, not only before checking fields. Especially (3) would help make this faster, because many classes can be skipped right away. This check should be simpler, though. We don't need to distinguish member/local/anonymous/etc classes. ```java public static void clean(Object function) { if (function == null) { return } final Class clazz = function.getClass(); if (clazz.getEnclosingClass() == null || !Modifiers.isStatic(clazz.getModifiers())) { return; } // do the cleaning } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
StephanEwen commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8280#issuecomment-489105632 This PR does a few things at the same time: 1. It recurses into fields, cleaning them. 2. It skips closure cleaning of functions that have a `writeObject` method or are `Externalizable`. 3. It adds checks to skip classes that are not inner classes. Recursion into non-transient and non-static fields probably solves the "wrapping function" problem, but it does much more than that. While this can be a good addition, we definitely need a way to turn this off, so we need to extend the closure cleaner configuration to support "don't clean", "clean", and "clean recursive". I think that (2) and (3) are good additions, but the check should probably be at the top level, not only before checking fields. Especially (3) would help make this faster, because many classes can be skipped right away. This check should be simpler, though: ```java public static void clean(Object function) { if (function == null) { return } final Class clazz = function.getClass(); if (clazz.getEnclosingClass() == null || !Modifiers.isStatic(clazz.getModifiers())) { return; } // do the cleaning } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update Apache Avro test to use try-with-resources
StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update Apache Avro test to use try-with-resources URL: https://github.com/apache/flink/pull/8282#discussion_r280777846 ## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java ## @@ -107,23 +107,23 @@ private void serializeAndDeserialize(final AvroOutputFormat.Codec codec, final S outputFormat.setSchema(schema); } - final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - - // when - try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) { - oos.writeObject(outputFormat); - } - try (final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray( { - // then - Object o = ois.readObject(); - assertTrue(o instanceof AvroOutputFormat); - @SuppressWarnings("unchecked") - final AvroOutputFormat restored = (AvroOutputFormat) o; - final AvroOutputFormat.Codec restoredCodec = (AvroOutputFormat.Codec) Whitebox.getInternalState(restored, "codec"); - final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema"); - - assertTrue(codec != null ? restoredCodec == codec : restoredCodec == null); - assertTrue(schema != null ? restoredSchema.equals(schema) : restoredSchema == null); + try (final ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + // when + try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(outputFormat); + } + try (final ByteArrayInputStream bais = new ByteArrayInputStream(bos.toByteArray()); + final ObjectInputStream ois = new ObjectInputStream(bais)) { Review comment: same as above, would not separate the nested streams/readers This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update Apache Avro test to use try-with-resources
StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update Apache Avro test to use try-with-resources URL: https://github.com/apache/flink/pull/8282#discussion_r280776846 ## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java ## @@ -56,8 +56,9 @@ public void testDefaultKryoRegisteredClassesDidNotChange() throws Exception { final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo(); - try (BufferedReader reader = new BufferedReader(new InputStreamReader( Review comment: I think changes like this one are not necessary. The `BufferedReader` forwards the `close()` call to the nested `InputStreamReader`. So there is no need to have them separate in the `try(...)` clause. I would keep the original pattern, which seems easier to read/format. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update Apache Avro test to use try-with-resources
StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update Apache Avro test to use try-with-resources URL: https://github.com/apache/flink/pull/8282#discussion_r280778538 ## File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java ## @@ -66,27 +66,28 @@ public void testComplexStringsDirecty() { ByteArrayOutputStream baos = new ByteArrayOutputStream(512); { - DataOutputStream dataOut = new DataOutputStream(baos); - DataOutputEncoder encoder = new DataOutputEncoder(); - encoder.setOut(dataOut); + try (DataOutputStream dataOut = new DataOutputStream(baos)) { + DataOutputEncoder encoder = new DataOutputEncoder(); + encoder.setOut(dataOut); - encoder.writeString(testString); - dataOut.flush(); - dataOut.close(); + encoder.writeString(testString); + dataOut.flush(); + } } byte[] data = baos.toByteArray(); // deserialize { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dataIn = new DataInputStream(bais); - DataInputDecoder decoder = new DataInputDecoder(); - decoder.setIn(dataIn); + try (ByteArrayInputStream bais = new ByteArrayInputStream(data)) { Review comment: same as above This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication URL: https://github.com/apache/flink/pull/7688#discussion_r280773729 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ## @@ -195,6 +198,22 @@ public static SSLHandlerFactory createRestClientSSLEngineFactory(final Configura return config.getString(SecurityOptions.SSL_ALGORITHMS).split(","); } + private static SslProvider getSSLProvider(final Configuration config) { + checkNotNull(config, "config must not be null"); + String providerString = config.getString(SecurityOptions.SSL_PROVIDER); + if (providerString.equalsIgnoreCase("OPENSSL")) { + if (OpenSsl.isAvailable()) { + return OPENSSL; + } else { + return JDK; + } + } else if (providerString.equalsIgnoreCase("JDK")) { + return JDK; + } else { + throw new IllegalArgumentException("Unknown SSL provider: " + providerString); Review comment: `IllegalArgumentException` runtime exception that doesn't use our exception hierarchy? Did you mean `IllegalConfigurationException`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication URL: https://github.com/apache/flink/pull/7688#discussion_r280771834 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java ## @@ -135,7 +134,8 @@ protected void initChannel(SocketChannel ch) { // SSL should be the first handler in the pipeline if (sslFactory != null) { - ch.pipeline().addLast("ssl", sslFactory.createNettySSLHandler()); + ch.pipeline().addLast("ssl", Review comment: I do not understand this `[FLINK-9816][network] netty-fy SSL configuration` commit. What does it do? There are no new tests, no changes in the existing tests, no documentation and nothing in the commit message :( Is it refactor? If so please explain in the commit message what are you refactoring and why. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication URL: https://github.com/apache/flink/pull/7688#discussion_r280771141 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java ## @@ -386,7 +386,8 @@ public void testCreateSSLEngineFactory() throws Exception { serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, "TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256"); final SSLHandlerFactory serverSSLHandlerFactory = SSLUtils.createInternalServerSSLEngineFactory(serverConfig); - final SslHandler sslHandler = serverSSLHandlerFactory.createNettySSLHandler(); + // note: a 'null' allocator seems to work here (probably only because we do not use the ssl engine!) + final SslHandler sslHandler = serverSSLHandlerFactory.createNettySSLHandler(null); Review comment: `io.netty.buffer.UnpooledByteBufAllocator#DEFAULT`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication URL: https://github.com/apache/flink/pull/7688#discussion_r280772939 ## File path: flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java ## @@ -299,6 +300,26 @@ .defaultValue(true) .withDescription("Flag to enable peer’s hostname verification during ssl handshake."); + /** +* SSL engine provider. +*/ + public static final ConfigOption SSL_PROVIDER = + key("security.ssl.provider") + .defaultValue("JDK") + .withDescription(Description.builder() + .text("The SSL engine provider to use for the ssl transport:") + .list( + TextElement.text("%s: default Java-based SSL engine", TextElement.code("JDK")), + TextElement.text("%s: openSSL-based SSL engine using system libraries" + + " (falls back to JDK if not available)", TextElement.code("OPENSSL")) Review comment: It would be safer to fail instead of fall back if users specifies the openSSL explicitly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication URL: https://github.com/apache/flink/pull/7688#discussion_r280775627 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java ## @@ -52,6 +60,23 @@ private static final String KEY_STORE_PASSWORD = "password"; private static final String KEY_PASSWORD = "password"; + public static final List AVAILABLE_SSL_PROVIDERS; + static { + if (OpenSsl.isAvailable()) { + AVAILABLE_SSL_PROVIDERS = Arrays.asList("JDK", "OPENSSL"); Review comment: Do we test for this somehow on travis? Manually? Is there some kind of instruction how to make `OpenSsl` available during testing? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#issuecomment-489094928 Yeah let me check. I kicked off a `mvn clean install -DskipTests` on my machine before submitting and that seems to have gone through ok. I'll try and repro what the travis build is doing and see if I can trigger the failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8285: FLINK- Fix typo in "Window Operator" documentation
flinkbot edited a comment on issue #8285: FLINK- Fix typo in "Window Operator" documentation URL: https://github.com/apache/flink/pull/8285#issuecomment-487081426 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @StephanEwen [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @StephanEwen [PMC] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @StephanEwen [PMC] * ✅ 5. Overall code [quality] is good. - Approved by @StephanEwen [PMC] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8285: FLINK- Fix typo in "Window Operator" documentation
StephanEwen commented on issue #8285: FLINK- Fix typo in "Window Operator" documentation URL: https://github.com/apache/flink/pull/8285#issuecomment-489093278 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8285: FLINK- Fix typo in "Window Operator" documentation
StephanEwen commented on issue #8285: FLINK- Fix typo in "Window Operator" documentation URL: https://github.com/apache/flink/pull/8285#issuecomment-489093241 Thanks for the patch. Merging this... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280736591 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of a timestamp WITH timezone consisting of {@code year-month-day hour:minute:second[.fractional] zone} + * with up to nanosecond precision and values ranging from {@code -01-01 00:00:00.0 +14:59} to + * {@code -12-31 23:59:59.9 -14:59}. Compared to the SQL standard, leap seconds (23:59:60 and + * 23:59:61) are not supported as the semantics are closer to {@link java.time.OffsetDateTime}. + * + * The serialized string representation is {@code TIMESTAMP(p) WITH TIME ZONE} where {@code p} is + * the number of digits of fractional seconds (=precision). {@code p} must have a value between 0 and + * 9 (both inclusive). If no precision is specified, {@code p} is equal to 6. + * + * Compared to {@link LocalZonedTimestampType}, the time zone offset information is physically + * stored in every datum. It is used individually for every computation, visualization, or communication + * to external systems. + * + * A conversion from {@link java.time.ZonedDateTime} ignores the zone ID. + * + * @see TimestampType + * @see LocalZonedTimestampType + */ +@PublicEvolving +public final class ZonedTimestampType extends LogicalType { + + private static final int MIN_PRECISION = 0; + + private static final int MAX_PRECISION = 9; + + private static final int DEFAULT_PRECISION = 6; + + private static final String DEFAULT_FORMAT = "TIMESTAMP(%d) WITH TIME ZONE"; + + private static final Set INPUT_CONVERSION = conversionSet( + java.time.ZonedDateTime.class.getName(), + java.time.OffsetDateTime.class.getName()); + + private static final Set OUTPUT_CONVERSION = conversionSet( + java.time.OffsetDateTime.class.getName()); + + private static final Class DEFAULT_CONVERSION = java.time.OffsetDateTime.class; + + /** +* Internal timestamp kind for time attribute metadata. +*/ + @Internal + public enum TimestampKind { Review comment: Could we extract this and use it for all `Timestamp` types? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280742847 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import java.lang.reflect.Array; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of an array of elements with same subtype. Compared to the SQL standard, the maximum + * cardinality of an array cannot be specified but is fixed at {@link Integer#MAX_VALUE}. Also, any + * valid type is supported as a subtype. + * + * The serialized string representation is {@code ARRAY} where {@code t} is the type of Review comment: nit: `is the type of` -> `is the logical type of` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280737135 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of a timestamp WITH LOCAL timezone consisting of {@code year-month-day hour:minute:second[.fractional] zone} + * with up to nanosecond precision and values ranging from {@code -01-01 00:00:00.0 +14:59} to + * {@code -12-31 23:59:59.9 -14:59}. Leap seconds (23:59:60 and 23:59:61) are not + * supported as the semantics are closer to {@link java.time.OffsetDateTime}. + * + * The serialized string representation is {@code TIMESTAMP(p) WITH LOCAL TIME ZONE} where {@code p} is + * the number of digits of fractional seconds (=precision). {@code p} must have a value between 0 and + * 9 (both inclusive). If no precision is specified, {@code p} is equal to 6. + * + * Compared to {@link ZonedTimestampType}, the time zone offset information is not stored physically + * in ever datum. Instead, the type assumes {@link java.time.Instant} semantics in UTC time zone at Review comment: `ever` -> `every` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280764585 ## File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java ## @@ -412,6 +419,40 @@ public void testNullType() { assertFalse(nullType.supportsOutputConversion(int.class)); } + @Test + public void testTypeInformationAnyType() { + final TypeInformationAnyType anyType = new TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.INT)); + + testEquality(anyType, new TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.LONG))); + + testNullability(anyType); + + testJavaSerializability(anyType); + + testConversions(anyType, new Class[]{Tuple2.class}, new Class[]{Tuple.class}); + } + + @Test + public void testAnyType() { + testAll( + new AnyType<>(Human.class, new KryoSerializer<>(Human.class, new ExecutionConfig())), + "ANY(org.apache.flink.table.types.LogicalTypesTest$Human, " + + "ADNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4AAATyxpo9cAA" + Review comment: Could we not hardcode the snapshot? Changes to the KryoSerializer will break this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280744074 ## File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java ## @@ -285,6 +286,27 @@ public void testDayTimeIntervalType() { ); } + @Test Review comment: How about we add a check or two for different supported array conversions? E.g. to verify that `new ArrayType(new ArrayType(new TimestampType()))` does not support `java.sql.Timestamp[].class`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280733755 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of a time WITHOUT timezone consisting of {@code hour:minute:second[.fractional]} with up Review comment: nit: Can `Time` have a timezone in general? Doesn't timezone imply there has to be date component? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280738815 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type for a group of year-month interval types. The type must be parameterized to one of + * the following resolutions: interval of years, interval of years to months, or interval of months. + * + * An interval of year-month consists of {@code +years-months} with values ranging from {@code --11} + * to {@code +-11}. The value is the same for all resolutions of this group (for example, an Review comment: I don't get this example... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280755454 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Logical type of a user-defined distinct type. A distinct type specifies an identifier and is backed + * by a source type. A distinct types has the same internal representation as a source type, but is Review comment: `types` -> `type` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280762367 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of an arbitrary serialized type. This type is a black box within the table ecosystem + * and is only deserialized at the edges. The any type is an extension to the SQL standard. + * + * The serialized string representation is {@code ANY(c, s)} where {@code s} is the originating + * class and {@code s} is the serialized {@link TypeSerializerSnapshot} in Base64 encoding. + * + * @param originating class for this type + */ +@PublicEvolving +public final class AnyType extends LogicalType { + + private static final String DEFAULT_FORMAT = "ANY(%s, %s)"; + + private static final Set INPUT_OUTPUT_CONVERSION = conversionSet( + byte[].class.getName(), + "org.apache.flink.table.dataformat.BinaryGeneric"); + + private final Class clazz; + + private final TypeSerializer serializer; + + private transient String serializerString; + + public AnyType(boolean isNullable, Class clazz, TypeSerializer serializer) { + super(isNullable, LogicalTypeRoot.ANY); + this.clazz = Preconditions.checkNotNull(clazz, "Class must not be null."); + this.serializer = Preconditions.checkNotNull(serializer, "Serializer must not be null."); + } + + public AnyType(Class clazz, TypeSerializer serializer) { + this(true, clazz, serializer); + } + + public Class getOriginatingClass() { + return clazz; + } + + public TypeSerializer getTypeSerializer() { + return serializer; + } + + @Override + public LogicalType copy(boolean isNullable) { + return new AnyType<>(isNullable, clazz, serializer.duplicate()); + } + + @Override + public String asSummaryString() { + return withNullability(DEFAULT_FORMAT, clazz.getName(), "..."); + } + + @Override + public String asSerializableString() { + return withNullability(DEFAULT_FORMAT, clazz.getName(), getOrCreateSerializerString()); + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return this.clazz.isAssignableFrom(clazz) || + INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return clazz.isAssignableFrom(this.clazz) || + INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultOutputConversion() { + return clazz; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } +
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280761541 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of an arbitrary serialized type. This type is a black box within the table ecosystem + * and is only deserialized at the edges. The any type is an extension to the SQL standard. + * + * The serialized string representation is {@code ANY(c, s)} where {@code s} is the originating Review comment: `{@code s}` -> `{@code c}` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280738605 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type for a group of year-month interval types. The type must be parameterized to one of + * the following resolutions: interval of years, interval of years to months, or interval of months. + * + * An interval of year-month consists of {@code +years-months} with values ranging from {@code --11} + * to {@code +-11}. The value is the same for all resolutions of this group (for example, an + * interval of months of 50 leads to {@code +04-02}). + * + * The serialized string representation is {@code INTERVAL YEAR(p)}, {@code INTERVAL YEAR(p) TO MONTH}, + * or {@code INTERVAL YEAR(p)} where {@code p} is the number of digits of years (=year precision). Review comment: `INTERVAL YEAR(p)` -> `INTERVAL MONTH` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on issue #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces
zentol commented on issue #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces URL: https://github.com/apache/flink/pull/8316#issuecomment-489086627 We should also add some tests for the `Execution*` classes to ensure the implementation are correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
twalthr commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280763754 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of a variable-length character string. + * + * The serialized string representation is {@code VARCHAR(n)} where {@code n} is the maximum + * number of code points. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE} (both + * inclusive). If no length is specified, {@code n} is equal to 1. + */ +@PublicEvolving +public final class VarCharType extends LogicalType { + + private static final int MIN_LENGTH = 1; + + private static final int MAX_LENGTH = Integer.MAX_VALUE; + + private static final int DEFAULT_LENGTH = 1; + + private static final String DEFAULT_FORMAT = "VARCHAR(%d)"; Review comment: It depends on the type. E.g. `ROW` and `ARRAY` could be represented in different ways. But I will correct it to `FORMAT` while merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces
zentol commented on a change in pull request #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces URL: https://github.com/apache/flink/pull/8316#discussion_r280747402 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/Flip1PipelinedFailoverRegionBuildingTest.java ## @@ -0,0 +1,899 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.Iterator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Tests that make sure that the building of pipelined connected failover regions works + * correctly. + */ +public class Flip1PipelinedFailoverRegionBuildingTest extends TestLogger { + + /** +* Tests that validates that a graph with single unconnected vertices works correctly. +* +* +* (v1) +* +* (v2) +* +* (v3) +* +* ... +* +*/ + @Test + public void testIndividualVertices() throws Exception { + final JobVertex source1 = new JobVertex("source1"); + source1.setInvokableClass(NoOpInvokable.class); + source1.setParallelism(2); + + final JobVertex source2 = new JobVertex("source2"); + source2.setInvokableClass(NoOpInvokable.class); + source2.setParallelism(2); + + final JobGraph jobGraph = new JobGraph("test job", source1, source2); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = new RestartPipelinedRegionStrategy(eg); + FailoverRegion sourceRegion11 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[0].getExecutionVertexID()); + FailoverRegion sourceRegion12 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[1].getExecutionVertexID()); + FailoverRegion targetRegion21 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[0].getExecutionVertexID()); + FailoverRegion targetRegion22 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[1].getExecutionVertexID()); + + assertTrue(sourceRegion11 != sourceRegion12); + assertTrue(sourceRegion12 != targetRegion21); + assertTrue(targetRegion21 != targetRegion22); + } + + /** +
[GitHub] [flink] zentol commented on a change in pull request #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces
zentol commented on a change in pull request #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces URL: https://github.com/apache/flink/pull/8316#discussion_r280752971 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A failover strategy that proposes to restart involved regions when a vertex fails. + * A region is defined by this strategy as tasks that communicate via pipelined data exchange. + */ +public class RestartPipelinedRegionStrategy implements FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class); + + /** The topology containing info about all the vertices and edges. */ + private final FailoverTopology topology; + + /** Maps execution vertex id to failover region. */ + private final Map regions; + + /** +* Creates a new failover strategy to restart pipelined regions that works on the given topology. +* +* @param topology containing info about all the vertices and edges +*/ + public RestartPipelinedRegionStrategy(FailoverTopology topology) { + this.topology = checkNotNull(topology); + this.regions = new HashMap<>(); + + // build regions based on the given topology + LOG.info("Start building failover regions."); + buildFailoverRegions(); + } + + // + // region building + // + + private void buildFailoverRegions() { + // currently we let a job with co-location constraints fail as one region + // putting co-located vertices in the same region with each other can be a future improvement + if (topology.containsCoLocationConstraints()) { + buildOneRegionForAllVertices(); + return; + } + + // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist) + // this helps to optimize the building performance as it uses reference equality + final IdentityHashMap> vertexToRegion = new IdentityHashMap<>(); + + // iterate all the vertices which are topologically sorted + for (FailoverVertex vertex : topology.getFailoverVertices()) { + HashSet currentRegion = null; Review comment: you could initialize this to `new HashSet<>(1)`, which would simplify the loop since `currentRegion`would never be null. We would in all cases go through the merge process. You could then move `vertexToRegion.put(vertex, currentRegion);` also to the top of the loop. The end result would be: ``` // iterate all the vertices which are topologically sorted for (FailoverVertex vertex : topology.getFailoverVertices()) { HashSet currentRegion = new HashSet<>(1); currentRegion.add(vertex); vertexToRegion.put(vertex, currentRegion); for (FailoverEdge inputEdge : vertex.getInputEdges()) { if (inputEdge.getResultPartitionType().isPipelined()) { final FailoverVertex producerVertex = inputEdge.getSourceVertex()
[GitHub] [flink] zentol commented on a change in pull request #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces
zentol commented on a change in pull request #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces URL: https://github.com/apache/flink/pull/8316#discussion_r280758391 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A failover strategy that proposes to restart involved regions when a vertex fails. + * A region is defined by this strategy as tasks that communicate via pipelined data exchange. + */ +public class RestartPipelinedRegionStrategy implements FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class); + + /** The topology containing info about all the vertices and edges. */ + private final FailoverTopology topology; + + /** Maps execution vertex id to failover region. */ + private final Map regions; + + /** +* Creates a new failover strategy to restart pipelined regions that works on the given topology. +* +* @param topology containing info about all the vertices and edges +*/ + public RestartPipelinedRegionStrategy(FailoverTopology topology) { + this.topology = checkNotNull(topology); + this.regions = new HashMap<>(); + + // build regions based on the given topology + LOG.info("Start building failover regions."); + buildFailoverRegions(); + } + + // + // region building + // + + private void buildFailoverRegions() { + // currently we let a job with co-location constraints fail as one region + // putting co-located vertices in the same region with each other can be a future improvement + if (topology.containsCoLocationConstraints()) { + buildOneRegionForAllVertices(); + return; + } + + // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist) + // this helps to optimize the building performance as it uses reference equality + final IdentityHashMap> vertexToRegion = new IdentityHashMap<>(); + + // iterate all the vertices which are topologically sorted + for (FailoverVertex vertex : topology.getFailoverVertices()) { + HashSet currentRegion = null; + for (FailoverEdge inputEdge : vertex.getInputEdges()) { + if (inputEdge.getResultPartitionType().isPipelined()) { + final FailoverVertex producerVertex = inputEdge.getSourceVertex(); + final HashSet producerRegion = vertexToRegion.get(producerVertex); + if (currentRegion == null) { + // use producer region as current region and add current vertex to it + if (producerRegion == null) { + throw new IllegalStateException("Producer task " + producerVertex.getExecutionVertexName() +