[GitHub] [flink] Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix bug in statsd exporter when using negative values

2019-05-03 Thread GitBox
Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix 
bug in statsd exporter when using negative values
URL: https://github.com/apache/flink/pull/8259#discussion_r280971043
 
 

 ##
 File path: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ##
 @@ -240,12 +240,6 @@ public String filterCharacters(String input) {
}
 
private boolean numberIsNegative(Number input) {
-   try {
-   return new 
BigDecimal(input.toString()).compareTo(BigDecimal.ZERO) == -1;
-   } catch (Exception e) {
-   //not all Number's can be converted to a BigDecimal, 
such as Infinity or NaN
-   //in this case we just say it isn't a negative number
-   return false;
-   }
+   return input.doubleValue() < 0;
 
 Review comment:
   Ah sorry, no reason! 
   
   I've changed it to Double.compare(), I've also changed line #190 to use this 
method so the double check there works in the same way.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969654
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -25,17 +25,16 @@ under the License.
 * ToC
 {:toc}
 
-Apache Flink streaming applications are typically designed to run indefinitely 
or for long periods of time.
-As with all long-running services, the applications need to be updated to 
adapt to changing requirements.
-This goes the same for data schemas that the applications work against; they 
evolve along with the application.
+Apache Flink 流应用通常被设计为永远或者长时间运行。
+与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。
+对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。
 
-This page provides an overview of how you can evolve your state type's data 
schema. 
-The current restrictions varies across different types and state structures 
(`ValueState`, `ListState`, etc.).
+此页面概述了如何升级状态类型的数据结构。
+目前的限制因不同类型和状态结构而异(`ValueState`、`ListState` 等)。
 
 Review comment:
   `目前对不同类系的状态结构(`ValueState`、`ListState` 等)有不同的限制`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969652
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -25,17 +25,16 @@ under the License.
 * ToC
 {:toc}
 
-Apache Flink streaming applications are typically designed to run indefinitely 
or for long periods of time.
-As with all long-running services, the applications need to be updated to 
adapt to changing requirements.
-This goes the same for data schemas that the applications work against; they 
evolve along with the application.
+Apache Flink 流应用通常被设计为永远或者长时间运行。
+与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。
+对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。
 
-This page provides an overview of how you can evolve your state type's data 
schema. 
-The current restrictions varies across different types and state structures 
(`ValueState`, `ListState`, etc.).
+此页面概述了如何升级状态类型的数据结构。
+目前的限制因不同类型和状态结构而异(`ValueState`、`ListState` 等)。
 
 Review comment:
   `目前对不同类系的状态结构(`ValueState`、`ListState` 等)有不同的限制`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969750
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -48,62 +47,52 @@ checkpointedState = 
getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
 
-Under the hood, whether or not the schema of state can be evolved depends on 
the serializer used to read / write
-persisted state bytes. Simply put, a registered state's schema can only be 
evolved if its serializer properly
-supports it. This is handled transparently by serializers generated by Flink's 
type serialization framework
-(current scope of support is listed [below]({{ site.baseurl 
}}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)).
+在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。
+简而言之,状态数据结构只有在其序列化器正确支持时才能升级。
+这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl 
}}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。
 
-If you intend to implement a custom `TypeSerializer` for your state type and 
would like to learn how to implement
-the serializer to support state schema evolution, please refer to
-[Custom State Serialization]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html).
-The documentation there also covers necessary internal details about the 
interplay between state serializers and Flink's
-state backends to support state schema evolution.
+如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器,
+可以参考 [自定义状态序列化器]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
+本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。
 
-## Evolving state schema
+## 升级状态数据结构
 
-To evolve the schema of a given state type, you would take the following steps:
+为了对给定的状态类型进行升级,你需要采取以下几个步骤:
 
- 1. Take a savepoint of your Flink streaming job.
- 2. Update state types in your application (e.g., modifying your Avro type 
schema).
- 3. Restore the job from the savepoint. When accessing state for the first 
time, Flink will assess whether or not
- the schema had been changed for the state, and migrate state schema if 
necessary.
+ 1. 对 Flink 流作业进行 savepoint 操作。
+ 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。
+ 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。
 
-The process of migrating state to adapt to changed schemas happens 
automatically, and independently for each state.
-This process is performed internally by Flink by first checking if the new 
serializer for the state has different
-serialization schema than the previous serializer; if so, the previous 
serializer is used to read the state to objects,
-and written back to bytes again with the new serializer.
+用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。
+Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有,
+那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。
 
-Further details about the migration process is out of the scope of this 
documentation; please refer to
-[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html).
+更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
 
-## Supported data types for schema evolution
+## 数据结构升级支持的数据类型
 
-Currently, schema evolution is supported only for POJO and Avro types. 
Therefore, if you care about schema evolution for
-state, it is currently recommended to always use either Pojo or Avro for state 
data types.
+目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。
+因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。
 
-There are plans to extend the support for more composite types; for more 
details,
-please refer to 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896).
+我们有计划支持更多的复合类型;更多的细节可以参考 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。
 
-### POJO types
+### POJO 类型
 
-Flink supports evolving schema of [POJO types]({{ site.baseurl 
}}/dev/types_serialization.html#rules-for-pojo-types),
-based on the following set of rules:
+Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl 
}}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级:
 
- 1. Fields can be removed. Once removed, the previous value for the removed 
field will be dropped in future checkpoints and savepoints.
- 2. New fields can be added. The new field will be initialized to the default 
value for its type, as
-[defined by 
Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html).
- 3. Declared fields types cannot change.
- 4. Class name of the POJO type cannot change, including the namespace of the 
class.
+ 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
 
 Review comment:
   `可以删除字段`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@i

[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969597
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -25,17 +25,16 @@ under the License.
 * ToC
 {:toc}
 
-Apache Flink streaming applications are typically designed to run indefinitely 
or for long periods of time.
-As with all long-running services, the applications need to be updated to 
adapt to changing requirements.
-This goes the same for data schemas that the applications work against; they 
evolve along with the application.
+Apache Flink 流应用通常被设计为永远或者长时间运行。
+与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。
+对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。
 
 Review comment:
   I think this line should append after the previous line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969785
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -48,62 +47,52 @@ checkpointedState = 
getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
 
-Under the hood, whether or not the schema of state can be evolved depends on 
the serializer used to read / write
-persisted state bytes. Simply put, a registered state's schema can only be 
evolved if its serializer properly
-supports it. This is handled transparently by serializers generated by Flink's 
type serialization framework
-(current scope of support is listed [below]({{ site.baseurl 
}}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)).
+在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。
+简而言之,状态数据结构只有在其序列化器正确支持时才能升级。
+这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl 
}}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。
 
-If you intend to implement a custom `TypeSerializer` for your state type and 
would like to learn how to implement
-the serializer to support state schema evolution, please refer to
-[Custom State Serialization]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html).
-The documentation there also covers necessary internal details about the 
interplay between state serializers and Flink's
-state backends to support state schema evolution.
+如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器,
+可以参考 [自定义状态序列化器]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
+本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。
 
-## Evolving state schema
+## 升级状态数据结构
 
-To evolve the schema of a given state type, you would take the following steps:
+为了对给定的状态类型进行升级,你需要采取以下几个步骤:
 
- 1. Take a savepoint of your Flink streaming job.
- 2. Update state types in your application (e.g., modifying your Avro type 
schema).
- 3. Restore the job from the savepoint. When accessing state for the first 
time, Flink will assess whether or not
- the schema had been changed for the state, and migrate state schema if 
necessary.
+ 1. 对 Flink 流作业进行 savepoint 操作。
+ 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。
+ 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。
 
-The process of migrating state to adapt to changed schemas happens 
automatically, and independently for each state.
-This process is performed internally by Flink by first checking if the new 
serializer for the state has different
-serialization schema than the previous serializer; if so, the previous 
serializer is used to read the state to objects,
-and written back to bytes again with the new serializer.
+用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。
+Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有,
+那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。
 
-Further details about the migration process is out of the scope of this 
documentation; please refer to
-[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html).
+更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
 
-## Supported data types for schema evolution
+## 数据结构升级支持的数据类型
 
-Currently, schema evolution is supported only for POJO and Avro types. 
Therefore, if you care about schema evolution for
-state, it is currently recommended to always use either Pojo or Avro for state 
data types.
+目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。
+因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。
 
-There are plans to extend the support for more composite types; for more 
details,
-please refer to 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896).
+我们有计划支持更多的复合类型;更多的细节可以参考 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。
 
-### POJO types
+### POJO 类型
 
-Flink supports evolving schema of [POJO types]({{ site.baseurl 
}}/dev/types_serialization.html#rules-for-pojo-types),
-based on the following set of rules:
+Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl 
}}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级:
 
- 1. Fields can be removed. Once removed, the previous value for the removed 
field will be dropped in future checkpoints and savepoints.
- 2. New fields can be added. The new field will be initialized to the default 
value for its type, as
-[defined by 
Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html).
- 3. Declared fields types cannot change.
- 4. Class name of the POJO type cannot change, including the namespace of the 
class.
+ 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
+ 2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 
类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。  
 
+ 3. 不可以修改字段的声明类型。
+ 4. 不可以改变 POJO 类型的类名,包括类的命名空间。
 
-Note that the schema of POJO type state can only be evolved when restoring 
from a previous savepoint with Flink versions
-newer than 1.8.0. When restoring

[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969736
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -48,62 +47,52 @@ checkpointedState = 
getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
 
-Under the hood, whether or not the schema of state can be evolved depends on 
the serializer used to read / write
-persisted state bytes. Simply put, a registered state's schema can only be 
evolved if its serializer properly
-supports it. This is handled transparently by serializers generated by Flink's 
type serialization framework
-(current scope of support is listed [below]({{ site.baseurl 
}}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)).
+在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。
+简而言之,状态数据结构只有在其序列化器正确支持时才能升级。
+这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl 
}}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。
 
-If you intend to implement a custom `TypeSerializer` for your state type and 
would like to learn how to implement
-the serializer to support state schema evolution, please refer to
-[Custom State Serialization]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html).
-The documentation there also covers necessary internal details about the 
interplay between state serializers and Flink's
-state backends to support state schema evolution.
+如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器,
+可以参考 [自定义状态序列化器]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
+本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。
 
-## Evolving state schema
+## 升级状态数据结构
 
-To evolve the schema of a given state type, you would take the following steps:
+为了对给定的状态类型进行升级,你需要采取以下几个步骤:
 
- 1. Take a savepoint of your Flink streaming job.
- 2. Update state types in your application (e.g., modifying your Avro type 
schema).
- 3. Restore the job from the savepoint. When accessing state for the first 
time, Flink will assess whether or not
- the schema had been changed for the state, and migrate state schema if 
necessary.
+ 1. 对 Flink 流作业进行 savepoint 操作。
+ 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。
+ 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。
 
-The process of migrating state to adapt to changed schemas happens 
automatically, and independently for each state.
-This process is performed internally by Flink by first checking if the new 
serializer for the state has different
-serialization schema than the previous serializer; if so, the previous 
serializer is used to read the state to objects,
-and written back to bytes again with the new serializer.
+用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。
+Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有,
+那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。
 
-Further details about the migration process is out of the scope of this 
documentation; please refer to
-[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html).
+更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
 
-## Supported data types for schema evolution
+## 数据结构升级支持的数据类型
 
-Currently, schema evolution is supported only for POJO and Avro types. 
Therefore, if you care about schema evolution for
-state, it is currently recommended to always use either Pojo or Avro for state 
data types.
+目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。
+因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。
 
-There are plans to extend the support for more composite types; for more 
details,
-please refer to 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896).
+我们有计划支持更多的复合类型;更多的细节可以参考 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。
 
-### POJO types
+### POJO 类型
 
-Flink supports evolving schema of [POJO types]({{ site.baseurl 
}}/dev/types_serialization.html#rules-for-pojo-types),
-based on the following set of rules:
+Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl 
}}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级:
 
- 1. Fields can be removed. Once removed, the previous value for the removed 
field will be dropped in future checkpoints and savepoints.
- 2. New fields can be added. The new field will be initialized to the default 
value for its type, as
-[defined by 
Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html).
- 3. Declared fields types cannot change.
- 4. Class name of the POJO type cannot change, including the namespace of the 
class.
+ 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
+ 2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 
类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。  
 
+ 3. 不可以修改字段的声明类型。
+ 4. 不可以改变 POJO 类型的类名,包括类的命名空间。
 
-Note that the schema of POJO type state can only be evolved when restoring 
from a previous savepoint with Flink versions
-newer than 1.8.0. When restoring

[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969780
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -48,62 +47,52 @@ checkpointedState = 
getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
 
-Under the hood, whether or not the schema of state can be evolved depends on 
the serializer used to read / write
-persisted state bytes. Simply put, a registered state's schema can only be 
evolved if its serializer properly
-supports it. This is handled transparently by serializers generated by Flink's 
type serialization framework
-(current scope of support is listed [below]({{ site.baseurl 
}}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)).
+在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。
+简而言之,状态数据结构只有在其序列化器正确支持时才能升级。
+这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl 
}}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。
 
-If you intend to implement a custom `TypeSerializer` for your state type and 
would like to learn how to implement
-the serializer to support state schema evolution, please refer to
-[Custom State Serialization]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html).
-The documentation there also covers necessary internal details about the 
interplay between state serializers and Flink's
-state backends to support state schema evolution.
+如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器,
+可以参考 [自定义状态序列化器]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
+本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。
 
-## Evolving state schema
+## 升级状态数据结构
 
-To evolve the schema of a given state type, you would take the following steps:
+为了对给定的状态类型进行升级,你需要采取以下几个步骤:
 
- 1. Take a savepoint of your Flink streaming job.
- 2. Update state types in your application (e.g., modifying your Avro type 
schema).
- 3. Restore the job from the savepoint. When accessing state for the first 
time, Flink will assess whether or not
- the schema had been changed for the state, and migrate state schema if 
necessary.
+ 1. 对 Flink 流作业进行 savepoint 操作。
+ 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。
+ 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。
 
-The process of migrating state to adapt to changed schemas happens 
automatically, and independently for each state.
-This process is performed internally by Flink by first checking if the new 
serializer for the state has different
-serialization schema than the previous serializer; if so, the previous 
serializer is used to read the state to objects,
-and written back to bytes again with the new serializer.
+用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。
+Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有,
+那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。
 
-Further details about the migration process is out of the scope of this 
documentation; please refer to
-[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html).
+更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
 
-## Supported data types for schema evolution
+## 数据结构升级支持的数据类型
 
-Currently, schema evolution is supported only for POJO and Avro types. 
Therefore, if you care about schema evolution for
-state, it is currently recommended to always use either Pojo or Avro for state 
data types.
+目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。
+因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。
 
-There are plans to extend the support for more composite types; for more 
details,
-please refer to 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896).
+我们有计划支持更多的复合类型;更多的细节可以参考 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。
 
-### POJO types
+### POJO 类型
 
-Flink supports evolving schema of [POJO types]({{ site.baseurl 
}}/dev/types_serialization.html#rules-for-pojo-types),
-based on the following set of rules:
+Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl 
}}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级:
 
- 1. Fields can be removed. Once removed, the previous value for the removed 
field will be dropped in future checkpoints and savepoints.
- 2. New fields can be added. The new field will be initialized to the default 
value for its type, as
-[defined by 
Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html).
- 3. Declared fields types cannot change.
- 4. Class name of the POJO type cannot change, including the namespace of the 
class.
+ 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
+ 2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 
类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。  
 
+ 3. 不可以修改字段的声明类型。
+ 4. 不可以改变 POJO 类型的类名,包括类的命名空间。
 
-Note that the schema of POJO type state can only be evolved when restoring 
from a previous savepoint with Flink versions
-newer than 1.8.0. When restoring

[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969642
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -25,17 +25,16 @@ under the License.
 * ToC
 {:toc}
 
-Apache Flink streaming applications are typically designed to run indefinitely 
or for long periods of time.
-As with all long-running services, the applications need to be updated to 
adapt to changing requirements.
-This goes the same for data schemas that the applications work against; they 
evolve along with the application.
+Apache Flink 流应用通常被设计为永远或者长时间运行。
+与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。
+对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。
 
-This page provides an overview of how you can evolve your state type's data 
schema. 
-The current restrictions varies across different types and state structures 
(`ValueState`, `ListState`, etc.).
+此页面概述了如何升级状态类型的数据结构。
 
 Review comment:
   Maybe we should not translate `data schema`[1]
   
   [1] 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969812
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -48,62 +47,52 @@ checkpointedState = 
getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
 
-Under the hood, whether or not the schema of state can be evolved depends on 
the serializer used to read / write
-persisted state bytes. Simply put, a registered state's schema can only be 
evolved if its serializer properly
-supports it. This is handled transparently by serializers generated by Flink's 
type serialization framework
-(current scope of support is listed [below]({{ site.baseurl 
}}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)).
+在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。
+简而言之,状态数据结构只有在其序列化器正确支持时才能升级。
+这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl 
}}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。
 
-If you intend to implement a custom `TypeSerializer` for your state type and 
would like to learn how to implement
-the serializer to support state schema evolution, please refer to
-[Custom State Serialization]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html).
-The documentation there also covers necessary internal details about the 
interplay between state serializers and Flink's
-state backends to support state schema evolution.
+如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器,
+可以参考 [自定义状态序列化器]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
+本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。
 
-## Evolving state schema
+## 升级状态数据结构
 
-To evolve the schema of a given state type, you would take the following steps:
+为了对给定的状态类型进行升级,你需要采取以下几个步骤:
 
- 1. Take a savepoint of your Flink streaming job.
- 2. Update state types in your application (e.g., modifying your Avro type 
schema).
- 3. Restore the job from the savepoint. When accessing state for the first 
time, Flink will assess whether or not
- the schema had been changed for the state, and migrate state schema if 
necessary.
+ 1. 对 Flink 流作业进行 savepoint 操作。
+ 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。
+ 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。
 
-The process of migrating state to adapt to changed schemas happens 
automatically, and independently for each state.
-This process is performed internally by Flink by first checking if the new 
serializer for the state has different
-serialization schema than the previous serializer; if so, the previous 
serializer is used to read the state to objects,
-and written back to bytes again with the new serializer.
+用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。
+Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有,
+那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。
 
-Further details about the migration process is out of the scope of this 
documentation; please refer to
-[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html).
+更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
 
-## Supported data types for schema evolution
+## 数据结构升级支持的数据类型
 
-Currently, schema evolution is supported only for POJO and Avro types. 
Therefore, if you care about schema evolution for
-state, it is currently recommended to always use either Pojo or Avro for state 
data types.
+目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。
 
 Review comment:
   `目前,仅支持 POJO 和 Avro 类型的 schema 升级`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969840
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -48,62 +47,52 @@ checkpointedState = 
getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
 
-Under the hood, whether or not the schema of state can be evolved depends on 
the serializer used to read / write
-persisted state bytes. Simply put, a registered state's schema can only be 
evolved if its serializer properly
-supports it. This is handled transparently by serializers generated by Flink's 
type serialization framework
-(current scope of support is listed [below]({{ site.baseurl 
}}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)).
+在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。
+简而言之,状态数据结构只有在其序列化器正确支持时才能升级。
+这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl 
}}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。
 
-If you intend to implement a custom `TypeSerializer` for your state type and 
would like to learn how to implement
-the serializer to support state schema evolution, please refer to
-[Custom State Serialization]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html).
-The documentation there also covers necessary internal details about the 
interplay between state serializers and Flink's
-state backends to support state schema evolution.
+如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器,
+可以参考 [自定义状态序列化器]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
+本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。
 
-## Evolving state schema
+## 升级状态数据结构
 
-To evolve the schema of a given state type, you would take the following steps:
+为了对给定的状态类型进行升级,你需要采取以下几个步骤:
 
- 1. Take a savepoint of your Flink streaming job.
- 2. Update state types in your application (e.g., modifying your Avro type 
schema).
- 3. Restore the job from the savepoint. When accessing state for the first 
time, Flink will assess whether or not
- the schema had been changed for the state, and migrate state schema if 
necessary.
+ 1. 对 Flink 流作业进行 savepoint 操作。
+ 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。
+ 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。
 
 Review comment:
   `从 savepoint 处重启作业` --> `从 savepoint 恢复作业`
   
   `当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。` --> `当第一次访问状态数据时,Flink 
会判断状态数据 schema 是否已经改变,并进行必要的迁移。`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-03 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r280969524
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -25,17 +25,16 @@ under the License.
 * ToC
 {:toc}
 
-Apache Flink streaming applications are typically designed to run indefinitely 
or for long periods of time.
-As with all long-running services, the applications need to be updated to 
adapt to changing requirements.
-This goes the same for data schemas that the applications work against; they 
evolve along with the application.
+Apache Flink 流应用通常被设计为永远或者长时间运行。
+与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。
 
 Review comment:
   I think this sentence `与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。`can be improved. 
Something like `与所有长期运行的服务一样,应用程序需要随着业务的迭代而进行调整。应用所处理的数据 schema 也会随着进行变化`, or 
you can get something better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-03 Thread GitBox
bowenli86 commented on a change in pull request #8337: [FLINK-12232][hive] 
Create HiveCatalog and support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#discussion_r280969694
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+
+/**
+ * Utils to convert meta objects between Flink and Hive for HiveCatalog.
+ */
+public class HiveCatalogUtil {
+
+   private HiveCatalogUtil() {
+   }
+
+   // -- Utils --
+
+   /**
+* Creates a Hive database from CatalogDatabase.
+*/
+   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
+   HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) 
db;
+
+   return new Database(
 
 Review comment:
   I got confused between 'description' and 'comment'. I've created 
https://github.com/apache/flink/pull/8340 to complete the javadoc for 
description. Will address "comment"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-03 Thread GitBox
bowenli86 commented on a change in pull request #8337: [FLINK-12232][hive] 
Create HiveCatalog and support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#discussion_r280969694
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+
+/**
+ * Utils to convert meta objects between Flink and Hive for HiveCatalog.
+ */
+public class HiveCatalogUtil {
+
+   private HiveCatalogUtil() {
+   }
+
+   // -- Utils --
+
+   /**
+* Creates a Hive database from CatalogDatabase.
+*/
+   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
+   HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) 
db;
+
+   return new Database(
 
 Review comment:
   I got confused between 'description' and 'comment'. I've 
createdhttps://github.com/apache/flink/pull/8340 to complete the javadoc for 
description. Will address "comment"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces

2019-05-03 Thread GitBox
flinkbot commented on issue #8340: [FLINK-12395][table] Add more detailed 
javadoc for getDescription() and getDetailedDescription() in catalog object 
interfaces
URL: https://github.com/apache/flink/pull/8340#issuecomment-489295083
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces

2019-05-03 Thread GitBox
bowenli86 commented on issue #8340: [FLINK-12395][table] Add more detailed 
javadoc for getDescription() and getDetailedDescription() in catalog object 
interfaces
URL: https://github.com/apache/flink/pull/8340#issuecomment-489295088
 
 
   cc @xuefuz 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12395) Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces

2019-05-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12395:
---
Labels: pull-request-available  (was: )

> Add more detailed javadoc for getDescription() and getDetailedDescription() 
> in catalog object interfaces
> 
>
> Key: FLINK-12395
> URL: https://issues.apache.org/jira/browse/FLINK-12395
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> We need to add more javadoc for {{getDescritpion()}} and 
> {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which 
> cases they will be called, and how we expect developers to implement them



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 opened a new pull request #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces

2019-05-03 Thread GitBox
bowenli86 opened a new pull request #8340: [FLINK-12395][table] Add more 
detailed javadoc for getDescription() and getDetailedDescription() in catalog 
object interfaces
URL: https://github.com/apache/flink/pull/8340
 
 
   ## What is the purpose of the change
   
   This PR adds more detailed javadoc  for getDescription() and 
getDetailedDescription() in catalog object interfaces, to help developers 
better understand when these APIs are expected to be called.
   
   ## Brief change log
   
 - Add more detailed javadoc for getDescription() and 
getDetailedDescription() in catalog object interfaces
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12395) Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces

2019-05-03 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12395:
-
Summary: Add more detailed javadoc for getDescription() and 
getDetailedDescription() in catalog object interfaces  (was: Document 
description and detailed description in catalog object interfaces)

> Add more detailed javadoc for getDescription() and getDetailedDescription() 
> in catalog object interfaces
> 
>
> Key: FLINK-12395
> URL: https://issues.apache.org/jira/browse/FLINK-12395
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> We need to add more javadoc for {{getDescritpion()}} and 
> {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which 
> cases they will be called, and how we expect developers to implement them



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12395) Document description and detailed description in catalog object interfaces

2019-05-03 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12395:
-
Summary: Document description and detailed description in catalog object 
interfaces  (was: Document description and detailed description in 
CatalogBaseTable)

> Document description and detailed description in catalog object interfaces
> --
>
> Key: FLINK-12395
> URL: https://issues.apache.org/jira/browse/FLINK-12395
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> We need to add more javadoc for {{getDescritpion()}} and 
> {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which 
> cases they will be called, and how we expect developers to implement them



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12395) Document description and detailed description in catalog object interfaces

2019-05-03 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-12395:


Assignee: Bowen Li

> Document description and detailed description in catalog object interfaces
> --
>
> Key: FLINK-12395
> URL: https://issues.apache.org/jira/browse/FLINK-12395
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> We need to add more javadoc for {{getDescritpion()}} and 
> {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which 
> cases they will be called, and how we expect developers to implement them



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-03 Thread GitBox
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create 
HiveCatalog and support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#discussion_r280969214
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+
+/**
+ * Utils to convert meta objects between Flink and Hive for HiveCatalog.
+ */
+public class HiveCatalogUtil {
+
+   private HiveCatalogUtil() {
+   }
+
+   // -- Utils --
+
+   /**
+* Creates a Hive database from CatalogDatabase.
+*/
+   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
+   HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) 
db;
+
+   return new Database(
 
 Review comment:
   Just curious, is there any use for "comment"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-03 Thread GitBox
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create 
HiveCatalog and support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#discussion_r280969532
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A catalog implementation for Hive.
+ */
+public class HiveCatalog extends HiveCatalogBase {
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   super(catalogName, hivemetastoreURI);
+
+   LOG.info("Created HiveCatalog '{}'", catalogName);
+   }
+
+   public HiveCatalog(String catalogName, HiveConf hiveConf) {
+   super(catalogName, hiveConf);
+
+   LOG.info("Created HiveCatalog '{}'", catalogName);
+   }
+
+   // -- databases --
+
+   @Override
+   public CatalogDatabase getDatabase(String databaseName)
+   throws DatabaseNotExistException, CatalogException {
+   Database hiveDb;
+
+   try {
+   hiveDb = client.getDatabase(databaseName);
+   } catch (NoSuchObjectException e) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get database %s from 
%s", databaseName, catalogName), e);
+   }
+
+   return new HiveCatalogDatabase(
+   hiveDb.getParameters(), hiveDb.getLocationUri(), 
hiveDb.getDescription());
+   }
+
+   @Override
+   public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException, CatalogException {
+   try {
+   
client.createDatabase(HiveCatalogUtil.createHiveDatabase(name, database));
 
 Review comment:
   I think the logics related to hive metastore client can be shared. For 
example, createDatabase() only needs a hive database object, which might be 
created differently in the two catalogs. However, once such a object is 
created, the remaining logic is the same and can be shared.


This is an automated message from the Apache Git

[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-03 Thread GitBox
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create 
HiveCatalog and support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#discussion_r280969550
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A catalog implementation for Hive.
+ */
+public class HiveCatalog extends HiveCatalogBase {
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   super(catalogName, hivemetastoreURI);
+
+   LOG.info("Created HiveCatalog '{}'", catalogName);
+   }
+
+   public HiveCatalog(String catalogName, HiveConf hiveConf) {
+   super(catalogName, hiveConf);
+
+   LOG.info("Created HiveCatalog '{}'", catalogName);
+   }
+
+   // -- databases --
+
+   @Override
+   public CatalogDatabase getDatabase(String databaseName)
+   throws DatabaseNotExistException, CatalogException {
+   Database hiveDb;
+
+   try {
+   hiveDb = client.getDatabase(databaseName);
+   } catch (NoSuchObjectException e) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get database %s from 
%s", databaseName, catalogName), e);
+   }
+
+   return new HiveCatalogDatabase(
+   hiveDb.getParameters(), hiveDb.getLocationUri(), 
hiveDb.getDescription());
+   }
+
+   @Override
+   public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException, CatalogException {
+   try {
+   
client.createDatabase(HiveCatalogUtil.createHiveDatabase(name, database));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, name);
+   }
+   } catch (TException e) {
+   throw new CatalogException(String.format("Failed to 
create database %s", name), e);
+   }
+   }
+
+   @Override
+

[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-03 Thread GitBox
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create 
HiveCatalog and support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#discussion_r280968990
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A hive catalog database implementation.
+ */
+public class HiveCatalogDatabase implements CatalogDatabase {
+   // Property of the database
+   private final Map properties;
+   // HDFS path of the database
+   private String location;
+   // Comment of the database
+   private String comment = "This is a generic catalog database.";
 
 Review comment:
   This shouldn't be a "generic" catalog, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-03 Thread GitBox
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create 
HiveCatalog and support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#discussion_r280969275
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogDatabase.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A hive catalog database implementation.
+ */
+public class HiveCatalogDatabase implements CatalogDatabase {
+   // Property of the database
+   private final Map properties;
+   // HDFS path of the database
+   private String location;
+   // Comment of the database
+   private String comment = "This is a generic catalog database.";
+
+   public HiveCatalogDatabase(Map properties) {
 
 Review comment:
   In other meta classes, properties are optional, where here it's required. 
Could we make it consistent? If all three variables are optional, we should 
have a default constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-03 Thread GitBox
xuefuz commented on a change in pull request #8337: [FLINK-12232][hive] Create 
HiveCatalog and support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#discussion_r280969459
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A catalog implementation for Hive.
+ */
+public class HiveCatalog extends HiveCatalogBase {
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   super(catalogName, hivemetastoreURI);
+
+   LOG.info("Created HiveCatalog '{}'", catalogName);
+   }
+
+   public HiveCatalog(String catalogName, HiveConf hiveConf) {
+   super(catalogName, hiveConf);
+
+   LOG.info("Created HiveCatalog '{}'", catalogName);
+   }
+
+   // -- databases --
+
+   @Override
+   public CatalogDatabase getDatabase(String databaseName)
+   throws DatabaseNotExistException, CatalogException {
+   Database hiveDb;
+
+   try {
 
 Review comment:
   Maybe we should extract this part to the base class, as the logic is the 
same for generic and hive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12395) Document description and detailed description in CatalogBaseTable

2019-05-03 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832986#comment-16832986
 ] 

Bowen Li edited comment on FLINK-12395 at 5/4/19 5:05 AM:
--

Agree to improve the javadoc, as I've also been confused...  I'll add javadoc 
for in which cases they are expected to be called.



was (Author: phoenixjiangnan):
Agree to improve the javadoc, as I've also been confused...  I'll add javadoc 
for in which cases they are expected to be called.

I will put "comment" into table properties map



> Document description and detailed description in CatalogBaseTable
> -
>
> Key: FLINK-12395
> URL: https://issues.apache.org/jira/browse/FLINK-12395
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> We need to add more javadoc for {{getDescritpion()}} and 
> {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which 
> cases they will be called, and how we expect developers to implement them



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11637) Translate "Checkpoints" page into Chinese

2019-05-03 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832987#comment-16832987
 ] 

Congxian Qiu(klion26) commented on FLINK-11637:
---

[~lsy] I'm not working on this now, if you want to contribute, please feel free 
to assign it to yourself.

> Translate "Checkpoints" page into Chinese
> -
>
> Key: FLINK-11637
> URL: https://issues.apache.org/jira/browse/FLINK-11637
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> doc locates in flink/docs/ops/state/checkpoints.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12395) Document description and detailed description in CatalogBaseTable

2019-05-03 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12395:
-
Description: We need to add more javadoc for {{getDescritpion()}} and 
{{getDetailedDescription()}} APIs in CatalogBaseTable, including in which cases 
they will be called, and how we expect developers to implement them  (was: 
CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} API, 
which don't seem to make such sense. I'm not sure what's the use case of 
detailed description, and how should users specify it in SQL and table api. 
Probably should remove detailed description and rename "description" to 
"comment".

Besides, for simplicity, we should consider just treating "comment" as a 
property in properties map.
)

> Document description and detailed description in CatalogBaseTable
> -
>
> Key: FLINK-12395
> URL: https://issues.apache.org/jira/browse/FLINK-12395
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> We need to add more javadoc for {{getDescritpion()}} and 
> {{getDetailedDescription()}} APIs in CatalogBaseTable, including in which 
> cases they will be called, and how we expect developers to implement them



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12395) Reconcile description and detailed description in CatalogBaseTable

2019-05-03 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832986#comment-16832986
 ] 

Bowen Li commented on FLINK-12395:
--

Agree to improve the javadoc, as I've also been confused...  I'll add javadoc 
for in which cases they are expected to be called.

I will put "comment" into table properties map



> Reconcile description and detailed description in CatalogBaseTable
> --
>
> Key: FLINK-12395
> URL: https://issues.apache.org/jira/browse/FLINK-12395
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} 
> API, which don't seem to make such sense. I'm not sure what's the use case of 
> detailed description, and how should users specify it in SQL and table api. 
> Probably should remove detailed description and rename "description" to 
> "comment".
> Besides, for simplicity, we should consider just treating "comment" as a 
> property in properties map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12395) Document description and detailed description in CatalogBaseTable

2019-05-03 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12395:
-
Summary: Document description and detailed description in CatalogBaseTable  
(was: Reconcile description and detailed description in CatalogBaseTable)

> Document description and detailed description in CatalogBaseTable
> -
>
> Key: FLINK-12395
> URL: https://issues.apache.org/jira/browse/FLINK-12395
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} 
> API, which don't seem to make such sense. I'm not sure what's the use case of 
> detailed description, and how should users specify it in SQL and table api. 
> Probably should remove detailed description and rename "description" to 
> "comment".
> Besides, for simplicity, we should consider just treating "comment" as a 
> property in properties map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12240) Support view related operations in GenericHiveMetastoreCatalog

2019-05-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12240:
---
Labels: pull-request-available  (was: )

> Support view related operations in GenericHiveMetastoreCatalog
> --
>
> Key: FLINK-12240
> URL: https://issues.apache.org/jira/browse/FLINK-12240
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Support view related operations in GenericHiveMetastoreCatalog, which 
> implements ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

2019-05-03 Thread GitBox
flinkbot commented on issue #8339: [FLINK-12240][hive] Support view related 
operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8339#issuecomment-489287613
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8339: [FLINK-12240][hive] Support view related operations in GenericHiveMetastoreCatalog

2019-05-03 Thread GitBox
bowenli86 opened a new pull request #8339: [FLINK-12240][hive] Support view 
related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8339
 
 
   ## What is the purpose of the change
   
   This PR added support for views in GenericHiveMetastoreCatalog, operations 
include create, drop, alter, rename, list.
   
   ## Brief change log
   
   - Implemented view related APIs in GenericHiveMetastoreCatalog
   - Reused and moved view related unit tests from GenericInMemoryCatalog to 
CatalogTestBase
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Tests moved to CatalogTestBase
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   
   This PR depends on https://github.com/apache/flink/pull/8329


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8338: [BP-1.6][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators

2019-05-03 Thread GitBox
flinkbot commented on issue #8338: [BP-1.6][FLINK-12296][StateBackend] Fix 
local state directory collision with state loss for chained keyed operators
URL: https://github.com/apache/flink/pull/8338#issuecomment-489284633
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 opened a new pull request #8338: [BP-1.6][FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators

2019-05-03 Thread GitBox
klion26 opened a new pull request #8338: [BP-1.6][FLINK-12296][StateBackend] 
Fix local state directory collision with state loss for chained keyed operators
URL: https://github.com/apache/flink/pull/8338
 
 
   ## What is the purpose of the change
   
   cherry-pick #8263 to release-1.6 manually.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   
`flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java#testMultipleStatefulOperatorChainedSnapshotAndRestore`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   
   @StefanRRichter 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12399) FilterableTableSource does not use filters on job run

2019-05-03 Thread Josh Bradt (JIRA)
Josh Bradt created FLINK-12399:
--

 Summary: FilterableTableSource does not use filters on job run
 Key: FLINK-12399
 URL: https://issues.apache.org/jira/browse/FLINK-12399
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.8.0
Reporter: Josh Bradt
 Attachments: flink-filter-bug.tar.gz

As discussed [on the mailing 
list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html],
 there appears to be a bug where a job that uses a custom FilterableTableSource 
does not keep the filters that were pushed down into the table source. More 
specifically, the table source does receive filters via applyPredicates, and a 
new table source with those filters is returned, but the final job graph 
appears to use the original table source, which does not contain any filters.

I attached a minimal example program to this ticket. The custom table source is 
as follows: 
{code:java}
public class CustomTableSource implements BatchTableSource, 
FilterableTableSource {

private static final Logger LOG = 
LoggerFactory.getLogger(CustomTableSource.class);

private final Filter[] filters;

private final FilterConverter converter = new FilterConverter();

public CustomTableSource() {
this(null);
}

private CustomTableSource(Filter[] filters) {
this.filters = filters;
}

@Override
public DataSet getDataSet(ExecutionEnvironment execEnv) {
if (filters == null) {
   LOG.info(" No filters defined ");
} else {
LOG.info(" Found filters ");
for (Filter filter : filters) {
LOG.info("FILTER: {}", filter);
}
}

return execEnv.fromCollection(allModels());
}

@Override
public TableSource applyPredicate(List predicates) {
LOG.info("Applying predicates");

List acceptedFilters = new ArrayList<>();
for (final Expression predicate : predicates) {
converter.convert(predicate).ifPresent(acceptedFilters::add);
}

return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
}

@Override
public boolean isFilterPushedDown() {
return filters != null;
}

@Override
public TypeInformation getReturnType() {
return TypeInformation.of(Model.class);
}

@Override
public TableSchema getTableSchema() {
return TableSchema.fromTypeInfo(getReturnType());
}

private List allModels() {
List models = new ArrayList<>();

models.add(new Model(1, 2, 3, 4));
models.add(new Model(10, 11, 12, 13));
models.add(new Model(20, 21, 22, 23));

return models;
}
}
{code}
 

When run, it logs
{noformat}
15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource  
 - Applying predicates
15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource  
 - Applying predicates
15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource  
 - Applying predicates
15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource  
 -  No filters defined {noformat}
which appears to indicate that although filters are getting pushed down, the 
final job does not use them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12398) Support partitioned view in catalog API

2019-05-03 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12398:


 Summary: Support partitioned view in catalog API
 Key: FLINK-12398
 URL: https://issues.apache.org/jira/browse/FLINK-12398
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Bowen Li
Assignee: Bowen Li


Partitioned view is not a rare thing in common databases:

SQL Server: 
https://docs.microsoft.com/en-us/sql/t-sql/statements/create-view-transact-sql?view=sql-server-2017#partitioned-views
Oracle: https://docs.oracle.com/cd/A57673_01/DOC/server/doc/A48506/partview.htm
Hive: https://cwiki.apache.org/confluence/display/Hive/PartitionedViews

The work may include moving {{isPartitioend()}} and {{getPartitionKeys}} from 
{{CatalogTable}} to {{CatalogBaseTable}}, and other changes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12397) Drop flink-shaded-asm-5

2019-05-03 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-12397:


 Summary: Drop flink-shaded-asm-5
 Key: FLINK-12397
 URL: https://issues.apache.org/jira/browse/FLINK-12397
 Project: Flink
  Issue Type: Improvement
  Components: BuildSystem / Shaded
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: shaded-7.0


With FLINK-12390 being resolved there's no need to further maintain 
flink-shaded-asm-5, so let's remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12390) Fully migrate to asm6

2019-05-03 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-12390.

Resolution: Fixed

master: 9975e0393a9e09dbde21ca61f1a5751cc5934411

> Fully migrate to asm6
> -
>
> Key: FLINK-12390
> URL: https://issues.apache.org/jira/browse/FLINK-12390
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We currently use a mix of asm5/asm6, let's migrate completely so we can drop 
> a dependency and remove a module from flink-shaded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12396) KafkaITCase.testOneSourceMultiplePartitions doesn't fail properly

2019-05-03 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12396:


 Summary: KafkaITCase.testOneSourceMultiplePartitions doesn't fail 
properly
 Key: FLINK-12396
 URL: https://issues.apache.org/jira/browse/FLINK-12396
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.9.0
Reporter: Bowen Li
 Fix For: 1.9.0


https://api.travis-ci.org/v3/job/527599974/log.txt

In the log, we can see that KafkaITCase.testOneSourceMultiplePartitions failed, 
but it kept running and doing all the snapshot, which caused the built to 
timeout.

{code:java}
05:00:38,896 INFO  
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink 
 - Snapshot of counter 4800 at checkpoint 115
05:00:39,050 ERROR org.apache.flink.streaming.connectors.kafka.KafkaITCase  
 - 

Test 
testOneSourceMultiplePartitions(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
 failed with:
org.junit.runners.model.TestTimedOutException: test timed out after 6 
milliseconds
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621)
at 
org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35)
at 
org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneSourceMultiplePartitionsExactlyOnceTest(KafkaConsumerTestBase.java:924)
at 
org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneSourceMultiplePartitions(KafkaITCase.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)


05:00:39,057 INFO  org.apache.flink.streaming.connectors.kafka.KafkaITCase  
 - 

Test 
testCancelingFullTopic(org.apache.flink.streaming.connectors.kafka.KafkaITCase) 
is running.

05:00:39,396 INFO  
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink 
 - Snapshot of counter 4800 at checkpoint 116
05:00:39,896 INFO  
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink 
 - Snapshot of counter 4800 at checkpoint 117
05:00:40,396 INFO  
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink 
 - Snapshot of counter 4800 at checkpoint 118
05:00:40,896 INFO  
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink 
 - Snapshot of counter 4800 at checkpoint 119
05:00:41,396 INFO  
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink 
 - Snapshot of counter 4800 at checkpoint 120
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol merged pull request #8332: [FLINK-12390][build] Fully migrate to flink-shaded-asm-6

2019-05-03 Thread GitBox
zentol merged pull request #8332: [FLINK-12390][build] Fully migrate to 
flink-shaded-asm-6
URL: https://github.com/apache/flink/pull/8332
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-05-03 Thread GitBox
bowenli86 commented on issue #8214: [FLINK-11476] [table] Create CatalogManager 
to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#issuecomment-489180854
 
 
   Thanks @dawidwys for the feedback! Let's sync up on Monday offline as planned


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-05-03 Thread GitBox
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280866583
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) {
 rels.map(_.asInstanceOf[ExecNode[_, _]])
   }
 
+  /**
+* Register an [[ReadableCatalog]] under a unique name.
+*
+* @param name the name under which the catalog will be registered
+* @param catalog the catalog to register
+* @throws CatalogAlreadyExistsException thrown if the catalog already 
exists
+*/
+  @throws[CatalogAlreadyExistsException]
+  def registerCatalog(name: String, catalog: ReadableCatalog): Unit = {
+catalogManager.registerCatalog(name, catalog)
+  }
+
+  /**
+* Get a registered [[ReadableCatalog]].
+*
+* @param catalogName name of the catalog to get
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+  @throws[CatalogNotExistException]
+  def getCatalog(catalogName: String): ReadableCatalog = {
+catalogManager.getCatalog(catalogName)
+  }
+
+  /**
+* Get the current catalog.
+*
+* @return the current catalog in CatalogManager
+*/
+  def getCurrentCatalog(): ReadableCatalog = {
+catalogManager.getCurrentCatalog
+  }
+
+  /**
+* Get the current database name.
+*
+* @return the current database of the current catalog
+*/
+  def getCurrentDatabaseName(): String = {
+catalogManager.getCurrentCatalog.getCurrentDatabase
+  }
+
+  /**
+* Set the current catalog.
+*
+* @param name name of the catalog to set as current catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+  @throws[CatalogNotExistException]
+  def setCurrentCatalog(name: String): Unit = {
+catalogManager.setCurrentCatalog(name)
+  }
+
+  /**
+* Set the current catalog and current database.
+*
+* @param catalogName name of the catalog to set as current catalog
+* @param databaseName name of the database to set as current database
+* @throws CatalogNotExistException  thrown if the catalog doesn't exist
+* @throws DatabaseNotExistException thrown if the database doesn't exist
+*/
+  @throws[CatalogNotExistException]
+  @throws[DatabaseNotExistException]
+  def setCurrentDatabase(catalogName: String, databaseName: String): Unit = {
+catalogManager.setCurrentCatalog(catalogName)
+catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName)
 
 Review comment:
   I agree the "current database" thing is now confusing because it actually 
presents two things: 1) the database of the session if its catalog is current; 
2) which will be the current db when users switch to its catalog, it will be 
the "default_db" in each catalog if users don't overwrite it, that's why 
CatalogManager needs to keep a map if we wanna extract it out. We separated 
these two concepts In Blink, and we probably do that too in Flink by replacing 
`get/setCurrentDatabase` with `String getDefaultDatabase()` (if that's way, we 
should do it in a different JIRA/PR).
   
   IMHO, this is not important at this moment, as it's only a small usability 
thing from users' perspective. I can remove parts related to 
`setCurrentDatabase` from this PR, and we can bake it for more time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-05-03 Thread GitBox
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280858643
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager implementation for Flink.
+ * TODO: [FLINK-11275] Decouple CatalogManager with Calcite
+ *   Idealy FlinkCatalogManager should be in flink-table-api-java module.
+ *   But due to that it currently depends on Calcite, a dependency that 
flink-table-api-java doesn't have right now.
+ *   We temporarily put FlinkCatalogManager in flink-table-planner-blink.
+ */
+public class FlinkCatalogManager implements CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCatalogManager.class);
+
+   public static final String BUILTIN_CATALOG_NAME = "builtin";
+
+   // The catalog to hold all registered and translated tables
+   // We disable caching here to prevent side effects
+   private CalciteSchema internalSchema = 
CalciteSchema.createRootSchema(false, false);
+   private SchemaPlus rootSchema = internalSchema.plus();
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   public FlinkCatalogManager() {
+   LOG.info("Initializing FlinkCatalogManager");
+   catalogs = new HashMap<>();
+
+   GenericInMemoryCatalog inMemoryCatalog = new 
GenericInMemoryCatalog(BUILTIN_CATALOG_NAME);
 
 Review comment:
   I think access control/authorization is orthogonal to this problem.
   
   I'd argue that we probably should allow some types of catalog to be readable 
only, and we just don't enter data into them but only read data from them. With 
the example of Confluent Registry, it has really nice features such as fancy 
UI, schema evolution, compatibility check, etc. If users are already using that 
to manage their kafka schemas, why would they wanna use something else that 
don't support all these features or not natively? Support writing in Flink's 
Registry Catalog would be vain IMHO.
   
   The `registerTable` is a part of `TableEnvironment` in the example. With the 
use case above in mind, having only a`Catalog` interface and letting a catalog 
implement 13+ methods (the # can grow) that it doesn't support seem to be not a 
good design.
   
   I wonder whether we should add a 
`CatalogManager#getReadableWritableCatalog()` API, which can perform checks in 
side and `TableEnvironment` won't worry about checks anymore.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on issue #8332: [FLINK-12390][build] Fully migrate to flink-shaded-asm-6

2019-05-03 Thread GitBox
GJL commented on issue #8332: [FLINK-12390][build] Fully migrate to 
flink-shaded-asm-6
URL: https://github.com/apache/flink/pull/8332#issuecomment-489154787
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on a change in pull request #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation

2019-05-03 Thread GitBox
sjwiesman commented on a change in pull request #8326: [FLINK-12378][docs] 
Consolidate FileSystem Documentation
URL: https://github.com/apache/flink/pull/8326#discussion_r280830610
 
 

 ##
 File path: docs/ops/deployment/aws.md
 ##
 @@ -64,81 +64,11 @@ HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m 
yarn-cluster -yn 1 examples/
 
 {% top %}
 
-## S3: Simple Storage Service
-
-[Amazon Simple Storage Service](http://aws.amazon.com/s3/) (Amazon S3) 
provides cloud object storage for a variety of use cases. You can use S3 with 
Flink for **reading** and **writing data** as well in conjunction with the 
[streaming **state backends**]({{ site.baseurl}}/ops/state/state_backends.html) 
or even as a YARN object storage.
-
-You can use S3 objects like regular files by specifying paths in the following 
format:
-
-{% highlight plain %}
-s3:///
-{% endhighlight %}
-
-The endpoint can either be a single file or a directory, for example:
-
-{% highlight java %}
-// Read from S3 bucket
-env.readTextFile("s3:///");
-
-// Write to S3 bucket
-stream.writeAsText("s3:///");
-
-// Use S3 as FsStatebackend
-env.setStateBackend(new FsStateBackend("s3:///"));
-{% endhighlight %}
-
-Note that these examples are *not* exhaustive and you can use S3 in other 
places as well, including your [high availability 
setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ 
site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); 
everywhere that Flink expects a FileSystem URI.
-
-For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and 
`flink-s3-fs-presto` S3
-filesystem wrappers which are fairly easy to set up. For some cases, however, 
e.g. for using S3 as
-YARN's resource storage dir, it may be necessary to set up a specific Hadoop 
S3 FileSystem
-implementation. Both ways are described below.
-
-### Shaded Hadoop/Presto S3 file systems (recommended)
-
-{% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](#emr-elastic-mapreduce). %}
-
-To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the 
respective JAR file from the
-`opt` directory to the `lib` directory of your Flink distribution before 
starting Flink, e.g.
-
-{% highlight bash %}
-cp ./opt/flink-s3-fs-presto-{{ site.version }}.jar ./lib/
-{% endhighlight %}
-
-Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem
-wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers
-for `s3a://` and `flink-s3-fs-presto` also registers for `s3p://`, so you can
-use this to use both at the same time.
-
- Configure Access Credentials
-
-After setting up the S3 FileSystem wrapper, you need to make sure that Flink 
is allowed to access your S3 buckets.
-
-# Identity and Access Management (IAM) (Recommended)
-
-The recommended way of setting up credentials on AWS is via [Identity and 
Access Management 
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You 
can use IAM features to securely give Flink instances the credentials that they 
need in order to access S3 buckets. Details about how to do this are beyond the 
scope of this documentation. Please refer to the AWS user guide. What you are 
looking for are [IAM 
Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html).
-
-If you set this up correctly, you can manage access to S3 within AWS and don't 
need to distribute any access keys to Flink.
-
-# Access Keys (Discouraged)
-
-Access to S3 can be granted via your **access and secret key pair**. Please 
note that this is discouraged since the [introduction of IAM 
roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
-
-You need to configure both `s3.access-key` and `s3.secret-key`  in Flink's  
`flink-conf.yaml`:
-
-{% highlight yaml %}
-s3.access-key: your-access-key
-s3.secret-key: your-secret-key
-{% endhighlight %}
-
-{% top %}
-
-### Hadoop-provided S3 file systems - manual setup
+### Hadoop-provided S3 file systems
 
 {% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](#emr-elastic-mapreduce). %}
 
-This setup is a bit more complex and we recommend using our shaded 
Hadoop/Presto file systems
-instead (see above) unless required otherwise, e.g. for using S3 as YARN's 
resource storage dir
+Apache Flink provides native [S3 FileSystem's](../filesystems/s3.html) out of 
the box and we recomend using them unless required otherwise, e.g. for using S3 
as YARN's resource storage dir
 
 Review comment:
   I think that's reasonable. I believe most users are using the built-in S3 
filesystems at this point. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries a

[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-05-03 Thread GitBox
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support 
for AzureFS
URL: https://github.com/apache/flink/pull/8117#issuecomment-489140860
 
 
   Ok I think I didn't catch the issue as I was using mvn 3.5 and it seems that 
the shade plugin behaves a bit differently on it. When I build with 3.2.5 (what 
we default to in the project), I was able to trigger the failure. 
   Seems to be due to the flink-s3-fs-base module filtering all the classes in 
the util package (where I had placed the HadoopConfigLoader). I'm not really 
sure on the context of the original change, I've updated to only filter the 
HadoopUtils class (no other classes in that package). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280777174
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280761093
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
 
 Review comment:
   We discussed in Slack about using state transitions but did not commit to 
this decision yet. I think we should continue relying on 
`onPartitionConsumable` for now. As @shuai-xu already mentioned in Slack, there 
may be operators that take a very long time until they produce a result, e.g., 
aggregations, sorting, etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280771723
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280769132
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
 
 Review comment:
   This check should not be necessary. The call 
`schedulerOperations.allocateSlotsAndDeploy()` should also work if the input 
collection is empty.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280761548
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280713033
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
 
 Review comment:
   If the input set of `ExecutionVertexIDs` contains all execution vertices, 
this strategy degrades to the `EagerSchedulingStrategy`.  Conceptually there is 
no difference between `restartTasks` and `startScheduling`. Both methods 
iterate over a set of vertices and schedule those whose input dependencies are 
fulfilled. The difference is that in `rest

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280786571
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   JobVertex jobVertex1 = new JobVertex("vertex#1");
+   JobVertex jobVertex2 = new JobVertex("vertex#2");
+   JobGraph graph = new JobGraph(jobVertex1, jobVertex2);
+   jobVertex2.connectNewDataSetAsInput(jobVertex1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+
+   for (int i = 0; i < 3; i++) {
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex1.getID(), i));
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex2.getID(), i));
+   }
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology,
+   graph);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+   }
+
+   /**
+* Tests that when on execution state change will start available 
downstream vertices.
+* vertex#0vertex#1
+*   \ /
+*\   /
+* \ /
+*  (BLOCKING, ALL)
+* vertex#3  vertex#2
+*   \/
+*\  /
+* \/
+*  (BLOCKING, ANY)
+* vertex#4
+*|
+*|
+*|
+*(PIPELINED)
+* vertex#5
+*/
+   @Test
+   public void testOnExecutionStateChange() {
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
 
 Review comment:
   I think this method is a bit too long. It could be split into more units 
(see Clean Code by Robert C Martin, Chapter 3).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280787008
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   JobVertex jobVertex1 = new JobVertex("vertex#1");
+   JobVertex jobVertex2 = new JobVertex("vertex#2");
+   JobGraph graph = new JobGraph(jobVertex1, jobVertex2);
+   jobVertex2.connectNewDataSetAsInput(jobVertex1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+
+   for (int i = 0; i < 3; i++) {
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex1.getID(), i));
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex2.getID(), i));
+   }
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology,
+   graph);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+   }
+
+   /**
+* Tests that when on execution state change will start available 
downstream vertices.
+* vertex#0vertex#1
+*   \ /
+*\   /
+* \ /
+*  (BLOCKING, ALL)
+* vertex#3  vertex#2
+*   \/
+*\  /
+* \/
+*  (BLOCKING, ANY)
+* vertex#4
+*|
+*|
+*|
+*(PIPELINED)
+* vertex#5
+*/
+   @Test
+   public void testOnExecutionStateChange() {
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   JobVertex[] jobVertices = new JobVertex[6];
+
+   for (int i = 0; i < 6; i++) {
+   jobVertices[i] = new JobVertex("vertex#" + i);
+   }
+
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[0], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[1], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[4].connectNewDataSetAsInput(jobVertices[3], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280713503
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
 
 Review comment:
   I don't think accessing the `jobGraph` is necessary here. Isn't this the 
same as `schedulingVertex.getConsumedResultPartitions().isEmpty()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280795534
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   JobVertex jobVertex1 = new JobVertex("vertex#1");
+   JobVertex jobVertex2 = new JobVertex("vertex#2");
+   JobGraph graph = new JobGraph(jobVertex1, jobVertex2);
+   jobVertex2.connectNewDataSetAsInput(jobVertex1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+
+   for (int i = 0; i < 3; i++) {
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex1.getID(), i));
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex2.getID(), i));
+   }
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology,
+   graph);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+   }
+
+   /**
+* Tests that when on execution state change will start available 
downstream vertices.
+* vertex#0vertex#1
+*   \ /
+*\   /
+* \ /
+*  (BLOCKING, ALL)
+* vertex#3  vertex#2
+*   \/
+*\  /
+* \/
+*  (BLOCKING, ANY)
+* vertex#4
+*|
+*|
+*|
+*(PIPELINED)
+* vertex#5
+*/
+   @Test
+   public void testOnExecutionStateChange() {
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   JobVertex[] jobVertices = new JobVertex[6];
+
+   for (int i = 0; i < 6; i++) {
+   jobVertices[i] = new JobVertex("vertex#" + i);
+   }
+
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[0], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[1], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[4].connectNewDataSetAsInput(jobVertices[3], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280766308
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280769682
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
 
 Review comment:
   Returning `null` should be avoided. All `SchedulingVertex` implementations 
should be required to return an empty collection instead. The null check can be 
removed then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280777174
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280762347
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+
+/**
+ * Simple implementation of {@link SchedulingResultPartition} for testing.
+ */
+public class TestingSchedulingResultPartition implements 
SchedulingResultPartition {
+   private final IntermediateDataSetID intermediateDataSetID;
+
+   private final IntermediateResultPartitionID 
intermediateResultPartitionID;
+
+   private final ResultPartitionType partitionType;
+
+   private final SchedulingVertex producer;
+
+   private Collection consumers;
+
+   TestingSchedulingResultPartition(IntermediateDataSetID dataSetID,
+   IntermediateResultPartitionID partitionID, 
ResultPartitionType type, SchedulingVertex producer,
+   Collection consumers) {
+   this.intermediateDataSetID = dataSetID;
+   this.intermediateResultPartitionID = partitionID;
+   this.partitionType = type;
+   this.producer = producer;
+   this.consumers = consumers;
+   }
+
+   @Override
+   public IntermediateResultPartitionID getId() {
+   return intermediateResultPartitionID;
+   }
+
+   @Override
+   public IntermediateDataSetID getResultId() {
+   return intermediateDataSetID;
+   }
+
+   @Override
+   public ResultPartitionType getPartitionType() {
+   return partitionType;
+   }
+
+   @Override
+   public ResultPartitionState getState() {
+   return ResultPartitionState.DONE;
+   }
+
+   @Override
+   public SchedulingVertex getProducer() {
+   return producer;
+   }
+
+   @Override
+   public Collection getConsumers() {
+   return consumers;
+   }
+
+   public void setConsumers(Collection consumers) {
+   this.consumers = consumers;
 
 Review comment:
   I think that `null` should not be allowed 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280762928
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+
+/**
+ * Simple implementation of {@link SchedulingResultPartition} for testing.
+ */
+public class TestingSchedulingResultPartition implements 
SchedulingResultPartition {
+   private final IntermediateDataSetID intermediateDataSetID;
+
+   private final IntermediateResultPartitionID 
intermediateResultPartitionID;
+
+   private final ResultPartitionType partitionType;
+
+   private final SchedulingVertex producer;
+
+   private Collection consumers;
 
 Review comment:
   This field can be initialized with the constant `Collections.emptyList()`. 
The getter `getConsumers()` should never return `null`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280761093
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
 
 Review comment:
   We discussed in Slack using state transitions but did not commit to this 
decision yet. I think we should continue relying on `onPartitionConsumable` for 
now. As @shuai-xu already mentioned in Slack, there may be operators that take 
a very long time until they produce a result, e.g., aggregations, sorting, etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With r

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280703833
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   

[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280777824
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   

[GitHub] [flink] twalthr commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
twalthr commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280798765
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of a time WITHOUT timezone consisting of {@code 
hour:minute:second[.fractional]} with up
 
 Review comment:
   The SQL standard also defines a `TIME WITH TIME ZONE`. I will explicitly 
mention that we won't support that. Java does also not provide a time class for 
such a type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-03 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r280775926
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   

[GitHub] [flink] sjwiesman commented on issue #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-03 Thread GitBox
sjwiesman commented on issue #8330: [FLINK-12388][docs] Update the production 
readiness checklist
URL: https://github.com/apache/flink/pull/8330#issuecomment-489107895
 
 
   Thanks for the review, I've updated the JobManager section to be more clear 
and also fixed a few other grammar issues I noticed on the second pass. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen edited a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-05-03 Thread GitBox
StephanEwen edited a comment on issue #8280: [FLINK-12297]Harden ClosureCleaner 
to handle the wrapped function
URL: https://github.com/apache/flink/pull/8280#issuecomment-489105632
 
 
   This PR does a few things at the same time:
 1. It recurses into fields, cleaning them.
 2. It skips closure cleaning of functions that have a `writeObject` method 
or are `Externalizable`.
 3. It adds checks to skip classes that are not inner classes.
   
   Recursion into non-transient and non-static fields probably solves the 
"wrapping function" problem, but it does much more than that. While this can be 
a good addition, we definitely need a way to turn this off, so we need to 
extend the closure cleaner configuration to support "don't clean", "clean", and 
"clean recursive".
   
   I think that (2) and (3) are good additions, but the check should probably 
be at the top level, not only before checking fields. Especially (3) would help 
make this faster, because many classes can be skipped right away.
   
   This check should be simpler, though. We don't need to distinguish 
member/local/anonymous/etc classes.
   ```java
   public static void clean(Object function) {
   if (function == null) {
   return
   }
   
   final Class clazz = function.getClass();
   if (clazz.getEnclosingClass() == null || 
!Modifiers.isStatic(clazz.getModifiers())) {
   return;
   }
   
   // do the cleaning
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-05-03 Thread GitBox
StephanEwen commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to 
handle the wrapped function
URL: https://github.com/apache/flink/pull/8280#issuecomment-489105632
 
 
   This PR does a few things at the same time:
 1. It recurses into fields, cleaning them.
 2. It skips closure cleaning of functions that have a `writeObject` method 
or are `Externalizable`.
 3. It adds checks to skip classes that are not inner classes.
   
   Recursion into non-transient and non-static fields probably solves the 
"wrapping function" problem, but it does much more than that. While this can be 
a good addition, we definitely need a way to turn this off, so we need to 
extend the closure cleaner configuration to support "don't clean", "clean", and 
"clean recursive".
   
   I think that (2) and (3) are good additions, but the check should probably 
be at the top level, not only before checking fields. Especially (3) would help 
make this faster, because many classes can be skipped right away.
   
   This check should be simpler, though:
   ```java
   public static void clean(Object function) {
   if (function == null) {
   return
   }
   
   final Class clazz = function.getClass();
   if (clazz.getEnclosingClass() == null || 
!Modifiers.isStatic(clazz.getModifiers())) {
   return;
   }
   
   // do the cleaning
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update Apache Avro test to use try-with-resources

2019-05-03 Thread GitBox
StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update 
Apache Avro test to use try-with-resources
URL: https://github.com/apache/flink/pull/8282#discussion_r280777846
 
 

 ##
 File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
 ##
 @@ -107,23 +107,23 @@ private void serializeAndDeserialize(final 
AvroOutputFormat.Codec codec, final S
outputFormat.setSchema(schema);
}
 
-   final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
-   // when
-   try (final ObjectOutputStream oos = new 
ObjectOutputStream(bos)) {
-   oos.writeObject(outputFormat);
-   }
-   try (final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray( {
-   // then
-   Object o = ois.readObject();
-   assertTrue(o instanceof AvroOutputFormat);
-   @SuppressWarnings("unchecked")
-   final AvroOutputFormat restored = 
(AvroOutputFormat) o;
-   final AvroOutputFormat.Codec restoredCodec = 
(AvroOutputFormat.Codec) Whitebox.getInternalState(restored, "codec");
-   final Schema restoredSchema = (Schema) 
Whitebox.getInternalState(restored, "userDefinedSchema");
-
-   assertTrue(codec != null ? restoredCodec == codec : 
restoredCodec == null);
-   assertTrue(schema != null ? 
restoredSchema.equals(schema) : restoredSchema == null);
+   try (final ByteArrayOutputStream bos = new 
ByteArrayOutputStream()) {
+   // when
+   try (final ObjectOutputStream oos = new 
ObjectOutputStream(bos)) {
+   oos.writeObject(outputFormat);
+   }
+   try (final ByteArrayInputStream bais = new 
ByteArrayInputStream(bos.toByteArray());
+   final ObjectInputStream ois = new 
ObjectInputStream(bais)) {
 
 Review comment:
   same as above, would not separate the nested streams/readers


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update Apache Avro test to use try-with-resources

2019-05-03 Thread GitBox
StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update 
Apache Avro test to use try-with-resources
URL: https://github.com/apache/flink/pull/8282#discussion_r280776846
 
 

 ##
 File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
 ##
 @@ -56,8 +56,9 @@
public void testDefaultKryoRegisteredClassesDidNotChange() throws 
Exception {
final Kryo kryo = new KryoSerializer<>(Integer.class, new 
ExecutionConfig()).getKryo();
 
-   try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(
 
 Review comment:
   I think changes like this one are not necessary. The `BufferedReader` 
forwards the `close()` call to the nested `InputStreamReader`. So there is no 
need to have them separate in the `try(...)` clause.
   
   I would keep the original pattern, which seems easier to read/format.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update Apache Avro test to use try-with-resources

2019-05-03 Thread GitBox
StephanEwen commented on a change in pull request #8282: [FLINK-12338] Update 
Apache Avro test to use try-with-resources
URL: https://github.com/apache/flink/pull/8282#discussion_r280778538
 
 

 ##
 File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
 ##
 @@ -66,27 +66,28 @@ public void testComplexStringsDirecty() {
 
ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
{
-   DataOutputStream dataOut = new 
DataOutputStream(baos);
-   DataOutputEncoder encoder = new 
DataOutputEncoder();
-   encoder.setOut(dataOut);
+   try (DataOutputStream dataOut = new 
DataOutputStream(baos)) {
+   DataOutputEncoder encoder = new 
DataOutputEncoder();
+   encoder.setOut(dataOut);
 
-   encoder.writeString(testString);
-   dataOut.flush();
-   dataOut.close();
+   encoder.writeString(testString);
+   dataOut.flush();
+   }
}
 
byte[] data = baos.toByteArray();
 
// deserialize
{
-   ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-   DataInputStream dataIn = new 
DataInputStream(bais);
-   DataInputDecoder decoder = new 
DataInputDecoder();
-   decoder.setIn(dataIn);
+   try (ByteArrayInputStream bais = new 
ByteArrayInputStream(data)) {
 
 Review comment:
   same as above


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication

2019-05-03 Thread GitBox
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] 
add option to configure SSL engine provider for TM communication
URL: https://github.com/apache/flink/pull/7688#discussion_r280773729
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
 ##
 @@ -195,6 +198,22 @@ public static SSLHandlerFactory 
createRestClientSSLEngineFactory(final Configura
return 
config.getString(SecurityOptions.SSL_ALGORITHMS).split(",");
}
 
+   private static SslProvider getSSLProvider(final Configuration config) {
+   checkNotNull(config, "config must not be null");
+   String providerString = 
config.getString(SecurityOptions.SSL_PROVIDER);
+   if (providerString.equalsIgnoreCase("OPENSSL")) {
+   if (OpenSsl.isAvailable()) {
+   return OPENSSL;
+   } else {
+   return JDK;
+   }
+   } else if (providerString.equalsIgnoreCase("JDK")) {
+   return JDK;
+   } else {
+   throw new IllegalArgumentException("Unknown SSL 
provider: " + providerString);
 
 Review comment:
   `IllegalArgumentException` runtime exception that doesn't use our exception 
hierarchy?  Did you mean `IllegalConfigurationException`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication

2019-05-03 Thread GitBox
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] 
add option to configure SSL engine provider for TM communication
URL: https://github.com/apache/flink/pull/7688#discussion_r280771834
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
 ##
 @@ -135,7 +134,8 @@ protected void initChannel(SocketChannel ch) {
 
// SSL should be the first handler in the 
pipeline
if (sslFactory != null) {
-   ch.pipeline().addLast("ssl", 
sslFactory.createNettySSLHandler());
+   ch.pipeline().addLast("ssl",
 
 Review comment:
   I do not understand this `[FLINK-9816][network] netty-fy SSL configuration` 
commit. What does it do? There are no new tests, no changes in the existing 
tests, no documentation and nothing in the commit message :(
   
   Is it refactor? If so please explain in the commit message what are you 
refactoring and why.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication

2019-05-03 Thread GitBox
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] 
add option to configure SSL engine provider for TM communication
URL: https://github.com/apache/flink/pull/7688#discussion_r280771141
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
 ##
 @@ -386,7 +386,8 @@ public void testCreateSSLEngineFactory() throws Exception {
serverConfig.setString(SecurityOptions.SSL_ALGORITHMS, 
"TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA256");
 
final SSLHandlerFactory serverSSLHandlerFactory = 
SSLUtils.createInternalServerSSLEngineFactory(serverConfig);
-   final SslHandler sslHandler = 
serverSSLHandlerFactory.createNettySSLHandler();
+   // note: a 'null' allocator seems to work here (probably only 
because we do not use the ssl engine!)
+   final SslHandler sslHandler = 
serverSSLHandlerFactory.createNettySSLHandler(null);
 
 Review comment:
   `io.netty.buffer.UnpooledByteBufAllocator#DEFAULT`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication

2019-05-03 Thread GitBox
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] 
add option to configure SSL engine provider for TM communication
URL: https://github.com/apache/flink/pull/7688#discussion_r280772939
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
 ##
 @@ -299,6 +300,26 @@
.defaultValue(true)
.withDescription("Flag to enable peer’s hostname 
verification during ssl handshake.");
 
+   /**
+* SSL engine provider.
+*/
+   public static final ConfigOption SSL_PROVIDER =
+   key("security.ssl.provider")
+   .defaultValue("JDK")
+   .withDescription(Description.builder()
+   .text("The SSL engine provider to use 
for the ssl transport:")
+   .list(
+   TextElement.text("%s: default 
Java-based SSL engine", TextElement.code("JDK")),
+   TextElement.text("%s: 
openSSL-based SSL engine using system libraries"
+   + " (falls back to JDK 
if not available)", TextElement.code("OPENSSL"))
 
 Review comment:
   It would be safer to fail instead of fall back if users specifies the 
openSSL explicitly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7688: [FLINK-9816][network] add option to configure SSL engine provider for TM communication

2019-05-03 Thread GitBox
pnowojski commented on a change in pull request #7688: [FLINK-9816][network] 
add option to configure SSL engine provider for TM communication
URL: https://github.com/apache/flink/pull/7688#discussion_r280775627
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
 ##
 @@ -52,6 +60,23 @@
private static final String KEY_STORE_PASSWORD = "password";
private static final String KEY_PASSWORD = "password";
 
+   public static final List AVAILABLE_SSL_PROVIDERS;
+   static {
+   if (OpenSsl.isAvailable()) {
+   AVAILABLE_SSL_PROVIDERS = Arrays.asList("JDK", 
"OPENSSL");
 
 Review comment:
   Do we test for this somehow on travis? Manually? Is there some kind of 
instruction how to make `OpenSsl` available during testing?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-05-03 Thread GitBox
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support 
for AzureFS
URL: https://github.com/apache/flink/pull/8117#issuecomment-489094928
 
 
   Yeah let me check. I kicked off a `mvn clean install -DskipTests` on my 
machine before submitting and that seems to have gone through ok. I'll try and 
repro what the travis build is doing and see if I can trigger the failure. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8285: FLINK- Fix typo in "Window Operator" documentation

2019-05-03 Thread GitBox
flinkbot edited a comment on issue #8285: FLINK- Fix typo in "Window Operator" 
documentation
URL: https://github.com/apache/flink/pull/8285#issuecomment-487081426
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @StephanEwen [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @StephanEwen [PMC]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @StephanEwen [PMC]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @StephanEwen [PMC]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8285: FLINK- Fix typo in "Window Operator" documentation

2019-05-03 Thread GitBox
StephanEwen commented on issue #8285: FLINK- Fix typo in "Window Operator" 
documentation
URL: https://github.com/apache/flink/pull/8285#issuecomment-489093278
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8285: FLINK- Fix typo in "Window Operator" documentation

2019-05-03 Thread GitBox
StephanEwen commented on issue #8285: FLINK- Fix typo in "Window Operator" 
documentation
URL: https://github.com/apache/flink/pull/8285#issuecomment-489093241
 
 
   Thanks for the patch.
   
   Merging this...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280736591
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of a timestamp WITH timezone consisting of {@code 
year-month-day hour:minute:second[.fractional] zone}
+ * with up to nanosecond precision and values ranging from {@code -01-01 
00:00:00.0 +14:59} to
+ * {@code -12-31 23:59:59.9 -14:59}. Compared to the SQL standard, 
leap seconds (23:59:60 and
+ * 23:59:61) are not supported as the semantics are closer to {@link 
java.time.OffsetDateTime}.
+ *
+ * The serialized string representation is {@code TIMESTAMP(p) WITH TIME 
ZONE} where {@code p} is
+ * the number of digits of fractional seconds (=precision). {@code p} must 
have a value between 0 and
+ * 9 (both inclusive). If no precision is specified, {@code p} is equal to 6.
+ *
+ * Compared to {@link LocalZonedTimestampType}, the time zone offset 
information is physically
+ * stored in every datum. It is used individually for every computation, 
visualization, or communication
+ * to external systems.
+ *
+ * A conversion from {@link java.time.ZonedDateTime} ignores the zone ID.
+ *
+ * @see TimestampType
+ * @see LocalZonedTimestampType
+ */
+@PublicEvolving
+public final class ZonedTimestampType extends LogicalType {
+
+   private static final int MIN_PRECISION = 0;
+
+   private static final int MAX_PRECISION = 9;
+
+   private static final int DEFAULT_PRECISION = 6;
+
+   private static final String DEFAULT_FORMAT = "TIMESTAMP(%d) WITH TIME 
ZONE";
+
+   private static final Set INPUT_CONVERSION = conversionSet(
+   java.time.ZonedDateTime.class.getName(),
+   java.time.OffsetDateTime.class.getName());
+
+   private static final Set OUTPUT_CONVERSION = conversionSet(
+   java.time.OffsetDateTime.class.getName());
+
+   private static final Class DEFAULT_CONVERSION = 
java.time.OffsetDateTime.class;
+
+   /**
+* Internal timestamp kind for time attribute metadata.
+*/
+   @Internal
+   public enum TimestampKind {
 
 Review comment:
   Could we extract this and use it for all `Timestamp` types?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280742847
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.lang.reflect.Array;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of an array of elements with same subtype. Compared to the SQL 
standard, the maximum
+ * cardinality of an array cannot be specified but is fixed at {@link 
Integer#MAX_VALUE}. Also, any
+ * valid type is supported as a subtype.
+ *
+ * The serialized string representation is {@code ARRAY} where {@code t} 
is the type of
 
 Review comment:
   nit: `is the type of` -> `is the logical type of`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280737135
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of a timestamp WITH LOCAL timezone consisting of {@code 
year-month-day hour:minute:second[.fractional] zone}
+ * with up to nanosecond precision and values ranging from {@code -01-01 
00:00:00.0 +14:59} to
+ * {@code -12-31 23:59:59.9 -14:59}. Leap seconds (23:59:60 and 
23:59:61) are not
+ * supported as the semantics are closer to {@link java.time.OffsetDateTime}.
+ *
+ * The serialized string representation is {@code TIMESTAMP(p) WITH LOCAL 
TIME ZONE} where {@code p} is
+ * the number of digits of fractional seconds (=precision). {@code p} must 
have a value between 0 and
+ * 9 (both inclusive). If no precision is specified, {@code p} is equal to 6.
+ *
+ * Compared to {@link ZonedTimestampType}, the time zone offset information 
is not stored physically
+ * in ever datum. Instead, the type assumes {@link java.time.Instant} 
semantics in UTC time zone at
 
 Review comment:
   `ever` -> `every`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280764585
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
 ##
 @@ -412,6 +419,40 @@ public void testNullType() {
assertFalse(nullType.supportsOutputConversion(int.class));
}
 
+   @Test
+   public void testTypeInformationAnyType() {
+   final TypeInformationAnyType anyType = new 
TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.INT));
+
+   testEquality(anyType, new 
TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.LONG)));
+
+   testNullability(anyType);
+
+   testJavaSerializability(anyType);
+
+   testConversions(anyType, new Class[]{Tuple2.class}, new 
Class[]{Tuple.class});
+   }
+
+   @Test
+   public void testAnyType() {
+   testAll(
+   new AnyType<>(Human.class, new 
KryoSerializer<>(Human.class, new ExecutionConfig())),
+   
"ANY(org.apache.flink.table.types.LogicalTypesTest$Human, " +
+   
"ADNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkxvZ2ljYWxUeXBlc1Rlc3QkSHVtYW4AAATyxpo9cAA"
 +
 
 Review comment:
   Could we not hardcode the snapshot? Changes to the KryoSerializer will break 
this test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280744074
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
 ##
 @@ -285,6 +286,27 @@ public void testDayTimeIntervalType() {
);
}
 
+   @Test
 
 Review comment:
   How about we add a check or two for different supported array conversions?
   E.g. to verify that `new ArrayType(new ArrayType(new TimestampType()))` does 
not support `java.sql.Timestamp[].class`. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280733755
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
 ##
 @@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of a time WITHOUT timezone consisting of {@code 
hour:minute:second[.fractional]} with up
 
 Review comment:
   nit: Can `Time` have a timezone in general? Doesn't timezone imply there has 
to be date component?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280738815
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type for a group of year-month interval types. The type must be 
parameterized to one of
+ * the following resolutions: interval of years, interval of years to months, 
or interval of months.
+ *
+ * An interval of year-month consists of {@code +years-months} with values 
ranging from {@code --11}
+ * to {@code +-11}. The value is the same for all resolutions of this 
group (for example, an
 
 Review comment:
   I don't get this example...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280755454
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DistinctType.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Logical type of a user-defined distinct type. A distinct type specifies an 
identifier and is backed
+ * by a source type. A distinct types has the same internal representation as 
a source type, but is
 
 Review comment:
   `types` -> `type`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280762367
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of an arbitrary serialized type. This type is a black box 
within the table ecosystem
+ * and is only deserialized at the edges. The any type is an extension to the 
SQL standard.
+ *
+ * The serialized string representation is {@code ANY(c, s)} where {@code 
s} is the originating
+ * class and {@code s} is the serialized {@link TypeSerializerSnapshot} in 
Base64 encoding.
+ *
+ * @param  originating class for this type
+ */
+@PublicEvolving
+public final class AnyType extends LogicalType {
+
+   private static final String DEFAULT_FORMAT = "ANY(%s, %s)";
+
+   private static final Set INPUT_OUTPUT_CONVERSION = 
conversionSet(
+   byte[].class.getName(),
+   "org.apache.flink.table.dataformat.BinaryGeneric");
+
+   private final Class clazz;
+
+   private final TypeSerializer serializer;
+
+   private transient String serializerString;
+
+   public AnyType(boolean isNullable, Class clazz, TypeSerializer 
serializer) {
+   super(isNullable, LogicalTypeRoot.ANY);
+   this.clazz = Preconditions.checkNotNull(clazz, "Class must not 
be null.");
+   this.serializer = Preconditions.checkNotNull(serializer, 
"Serializer must not be null.");
+   }
+
+   public AnyType(Class clazz, TypeSerializer serializer) {
+   this(true, clazz, serializer);
+   }
+
+   public Class getOriginatingClass() {
+   return clazz;
+   }
+
+   public TypeSerializer getTypeSerializer() {
+   return serializer;
+   }
+
+   @Override
+   public LogicalType copy(boolean isNullable) {
+   return new AnyType<>(isNullable, clazz, serializer.duplicate());
+   }
+
+   @Override
+   public String asSummaryString() {
+   return withNullability(DEFAULT_FORMAT, clazz.getName(), "...");
+   }
+
+   @Override
+   public String asSerializableString() {
+   return withNullability(DEFAULT_FORMAT, clazz.getName(), 
getOrCreateSerializerString());
+   }
+
+   @Override
+   public boolean supportsInputConversion(Class clazz) {
+   return this.clazz.isAssignableFrom(clazz) ||
+   INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+   }
+
+   @Override
+   public boolean supportsOutputConversion(Class clazz) {
+   return clazz.isAssignableFrom(this.clazz) ||
+   INPUT_OUTPUT_CONVERSION.contains(clazz.getName());
+   }
+
+   @Override
+   public Class getDefaultOutputConversion() {
+   return clazz;
+   }
+
+   @Override
+   public List getChildren() {
+   return Collections.emptyList();
+   }
+
+   @Override
+   public  R accept(LogicalTypeVisitor visitor) {
+   return visitor.visit(this);
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   if (!super.equals(o)) {
+   return false;
+   }
+  

[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280761541
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of an arbitrary serialized type. This type is a black box 
within the table ecosystem
+ * and is only deserialized at the edges. The any type is an extension to the 
SQL standard.
+ *
+ * The serialized string representation is {@code ANY(c, s)} where {@code 
s} is the originating
 
 Review comment:
   `{@code s}` -> `{@code c}`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280738605
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type for a group of year-month interval types. The type must be 
parameterized to one of
+ * the following resolutions: interval of years, interval of years to months, 
or interval of months.
+ *
+ * An interval of year-month consists of {@code +years-months} with values 
ranging from {@code --11}
+ * to {@code +-11}. The value is the same for all resolutions of this 
group (for example, an
+ * interval of months of 50 leads to {@code +04-02}).
+ *
+ * The serialized string representation is {@code INTERVAL YEAR(p)}, {@code 
INTERVAL YEAR(p) TO MONTH},
+ * or {@code INTERVAL YEAR(p)} where {@code p} is the number of digits of 
years (=year precision).
 
 Review comment:
   `INTERVAL YEAR(p)` -> `INTERVAL MONTH`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on issue #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces

2019-05-03 Thread GitBox
zentol commented on issue #8316: [FLINK-12369] [coordination] Implement a 
region failover strategy regarding the next version FailoverStrategy interfaces
URL: https://github.com/apache/flink/pull/8316#issuecomment-489086627
 
 
   We should also add some tests for the `Execution*` classes to ensure the 
implementation are correct.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-03 Thread GitBox
twalthr commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280763754
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of a variable-length character string.
+ *
+ * The serialized string representation is {@code VARCHAR(n)} where {@code 
n} is the maximum
+ * number of code points. {@code n} must have a value between 1 and {@link 
Integer#MAX_VALUE} (both
+ * inclusive). If no length is specified, {@code n} is equal to 1.
+ */
+@PublicEvolving
+public final class VarCharType extends LogicalType {
+
+   private static final int MIN_LENGTH = 1;
+
+   private static final int MAX_LENGTH = Integer.MAX_VALUE;
+
+   private static final int DEFAULT_LENGTH = 1;
+
+   private static final String DEFAULT_FORMAT = "VARCHAR(%d)";
 
 Review comment:
   It depends on the type. E.g. `ROW` and `ARRAY` could be represented in 
different ways. But I will correct it to `FORMAT` while merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces

2019-05-03 Thread GitBox
zentol commented on a change in pull request #8316: [FLINK-12369] 
[coordination] Implement a region failover strategy regarding the next version 
FailoverStrategy interfaces
URL: https://github.com/apache/flink/pull/8316#discussion_r280747402
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/Flip1PipelinedFailoverRegionBuildingTest.java
 ##
 @@ -0,0 +1,899 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests that make sure that the building of pipelined connected failover 
regions works
+ * correctly.
+ */
+public class Flip1PipelinedFailoverRegionBuildingTest extends TestLogger {
+
+   /**
+* Tests that validates that a graph with single unconnected vertices 
works correctly.
+*
+* 
+* (v1)
+*
+* (v2)
+*
+* (v3)
+*
+* ...
+* 
+*/
+   @Test
+   public void testIndividualVertices() throws Exception {
+   final JobVertex source1 = new JobVertex("source1");
+   source1.setInvokableClass(NoOpInvokable.class);
+   source1.setParallelism(2);
+
+   final JobVertex source2 = new JobVertex("source2");
+   source2.setInvokableClass(NoOpInvokable.class);
+   source2.setParallelism(2);
+
+   final JobGraph jobGraph = new JobGraph("test job", source1, 
source2);
+   final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+   RestartPipelinedRegionStrategy failoverStrategy = new 
RestartPipelinedRegionStrategy(eg);
+   FailoverRegion sourceRegion11 = 
failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[0].getExecutionVertexID());
+   FailoverRegion sourceRegion12 = 
failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[1].getExecutionVertexID());
+   FailoverRegion targetRegion21 = 
failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[0].getExecutionVertexID());
+   FailoverRegion targetRegion22 = 
failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[1].getExecutionVertexID());
+
+   assertTrue(sourceRegion11 != sourceRegion12);
+   assertTrue(sourceRegion12 != targetRegion21);
+   assertTrue(targetRegion21 != targetRegion22);
+   }
+
+   /**
+  

[GitHub] [flink] zentol commented on a change in pull request #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces

2019-05-03 Thread GitBox
zentol commented on a change in pull request #8316: [FLINK-12369] 
[coordination] Implement a region failover strategy regarding the next version 
FailoverStrategy interfaces
URL: https://github.com/apache/flink/pull/8316#discussion_r280752971
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A failover strategy that proposes to restart involved regions when a vertex 
fails.
+ * A region is defined by this strategy as tasks that communicate via 
pipelined data exchange.
+ */
+public class RestartPipelinedRegionStrategy implements FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
+
+   /** The topology containing info about all the vertices and edges. */
+   private final FailoverTopology topology;
+
+   /** Maps execution vertex id to failover region. */
+   private final Map regions;
+
+   /**
+* Creates a new failover strategy to restart pipelined regions that 
works on the given topology.
+*
+* @param topology containing info about all the vertices and edges
+*/
+   public RestartPipelinedRegionStrategy(FailoverTopology topology) {
+   this.topology = checkNotNull(topology);
+   this.regions = new HashMap<>();
+
+   // build regions based on the given topology
+   LOG.info("Start building failover regions.");
+   buildFailoverRegions();
+   }
+
+   // 

+   //  region building
+   // 

+
+   private void buildFailoverRegions() {
+   // currently we let a job with co-location constraints fail as 
one region
+   // putting co-located vertices in the same region with each 
other can be a future improvement
+   if (topology.containsCoLocationConstraints()) {
+   buildOneRegionForAllVertices();
+   return;
+   }
+
+   // we use the map (list -> null) to imitate an IdentityHashSet 
(which does not exist)
+   // this helps to optimize the building performance as it uses 
reference equality
+   final IdentityHashMap> 
vertexToRegion = new IdentityHashMap<>();
+
+   // iterate all the vertices which are topologically sorted
+   for (FailoverVertex vertex : topology.getFailoverVertices()) {
+   HashSet currentRegion = null;
 
 Review comment:
   you could initialize this to `new HashSet<>(1)`, which would simplify the 
loop since `currentRegion`would never be null. We would in all cases go through 
the merge process.
   
   You could then move `vertexToRegion.put(vertex, currentRegion);` also to the 
top of the loop.
   
   The end result would be:
   ```
   // iterate all the vertices which are topologically sorted
   for (FailoverVertex vertex : topology.getFailoverVertices()) {
HashSet currentRegion = new HashSet<>(1);
currentRegion.add(vertex);
vertexToRegion.put(vertex, currentRegion);
   
for (FailoverEdge inputEdge : vertex.getInputEdges()) {
if (inputEdge.getResultPartitionType().isPipelined()) {
final FailoverVertex producerVertex = 
inputEdge.getSourceVertex()

[GitHub] [flink] zentol commented on a change in pull request #8316: [FLINK-12369] [coordination] Implement a region failover strategy regarding the next version FailoverStrategy interfaces

2019-05-03 Thread GitBox
zentol commented on a change in pull request #8316: [FLINK-12369] 
[coordination] Implement a region failover strategy regarding the next version 
FailoverStrategy interfaces
URL: https://github.com/apache/flink/pull/8316#discussion_r280758391
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##
 @@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A failover strategy that proposes to restart involved regions when a vertex 
fails.
+ * A region is defined by this strategy as tasks that communicate via 
pipelined data exchange.
+ */
+public class RestartPipelinedRegionStrategy implements FailoverStrategy {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
+
+   /** The topology containing info about all the vertices and edges. */
+   private final FailoverTopology topology;
+
+   /** Maps execution vertex id to failover region. */
+   private final Map regions;
+
+   /**
+* Creates a new failover strategy to restart pipelined regions that 
works on the given topology.
+*
+* @param topology containing info about all the vertices and edges
+*/
+   public RestartPipelinedRegionStrategy(FailoverTopology topology) {
+   this.topology = checkNotNull(topology);
+   this.regions = new HashMap<>();
+
+   // build regions based on the given topology
+   LOG.info("Start building failover regions.");
+   buildFailoverRegions();
+   }
+
+   // 

+   //  region building
+   // 

+
+   private void buildFailoverRegions() {
+   // currently we let a job with co-location constraints fail as 
one region
+   // putting co-located vertices in the same region with each 
other can be a future improvement
+   if (topology.containsCoLocationConstraints()) {
+   buildOneRegionForAllVertices();
+   return;
+   }
+
+   // we use the map (list -> null) to imitate an IdentityHashSet 
(which does not exist)
+   // this helps to optimize the building performance as it uses 
reference equality
+   final IdentityHashMap> 
vertexToRegion = new IdentityHashMap<>();
+
+   // iterate all the vertices which are topologically sorted
+   for (FailoverVertex vertex : topology.getFailoverVertices()) {
+   HashSet currentRegion = null;
+   for (FailoverEdge inputEdge : vertex.getInputEdges()) {
+   if 
(inputEdge.getResultPartitionType().isPipelined()) {
+   final FailoverVertex producerVertex = 
inputEdge.getSourceVertex();
+   final HashSet 
producerRegion = vertexToRegion.get(producerVertex);
+   if (currentRegion == null) {
+   // use producer region as 
current region and add current vertex to it
+   if (producerRegion == null) {
+   throw new 
IllegalStateException("Producer task " + producerVertex.getExecutionVertexName()
+

  1   2   >