[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137972#comment-16137972
 ] 

ASF GitHub Bot commented on FLINK-6244:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4320


> Emit timeouted Patterns as Side Output
> --
>
> Key: FLINK-6244
> URL: https://issues.apache.org/jira/browse/FLINK-6244
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
> Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...

2017-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4320


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4320: [FLINK-6244] Emit timeouted Patterns as Side Output

2017-08-22 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4320
  
I will merge it then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137878#comment-16137878
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
@fhueske @wuchong 

Thank you for your suggestion, I have update the PR and add test case, 
thanks.
Do you have time to look at this?

Thanks, Kaibo


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...

2017-08-22 Thread kaibozhou
Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
@fhueske @wuchong 

Thank you for your suggestion, I have update the PR and add test case, 
thanks.
Do you have time to look at this?

Thanks, Kaibo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7129) Dynamically changing patterns

2017-08-22 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137864#comment-16137864
 ] 

Dian Fu commented on FLINK-7129:


Hi [~dawidwys],
I thought about the issue you mentioned for a few days and think that may not 
be a big problem. The patterns belongs to all the keys and can be stored in 
{{OperatorStateBackend}}. Then there is no need to distribute patterns to all 
keys and we just need to distribute them to all instances of operators. Please 
correct me if my understanding is not correct. Thanks a lot.

> Dynamically changing patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137860#comment-16137860
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
--- End diff --

Agree, It can be codegen now.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-22 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
--- End diff --

Agree, It can be codegen now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-22 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653928
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
--- End diff --

Its a good idea, the code will be more clean after refactor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137857#comment-16137857
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653928
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
--- End diff --

Its a good idea, the code will be more clean after refactor.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137854#comment-16137854
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653871
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
--- End diff --

Yes, RuntimeContext not need to be member var.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-22 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653871
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
--- End diff --

Yes, RuntimeContext not need to be member var.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-08-22 Thread Fang Yong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong reassigned FLINK-6864:


Assignee: Fang Yong

> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4574: [FLINK-6864] Fix confusing "invalid POJO type" messages f...

2017-08-22 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4574
  
@tzulitai I create this PR to fix 
[https://issues.apache.org/jira/browse/FLINK-6864](https://issues.apache.org/jira/browse/FLINK-6864),
 I think add logs instead of removing them will be better, what do you think? 
Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137830#comment-16137830
 ] 

ASF GitHub Bot commented on FLINK-6864:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4574
  
@tzulitai I create this PR to fix 
[https://issues.apache.org/jira/browse/FLINK-6864](https://issues.apache.org/jira/browse/FLINK-6864),
 I think add logs instead of removing them will be better, what do you think? 
Thanks


> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137827#comment-16137827
 ] 

ASF GitHub Bot commented on FLINK-6864:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4574

[FLINK-6864] Fix confusing "invalid POJO type" messages from TypeExtractor

## What is the purpose of the change

Fix confusing "invalid POJO type" messages from TypeExtractor

## Brief change log

  - *Improve log about pojo in TypeExtractor*
  - *Add notice in types_serialization.cmd about rules-for-pojo-types*

## Verifying this change

This change needs no testing

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6864

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4574.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4574


commit 5b3861716404c1d1cce9f18af7ca596a21ecb014
Author: zjureel 
Date:   2017-08-23T02:30:31Z

[FLINK-6864] Fix confusing "invalid POJO type" messages from TypeExtractor




> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4574: [FLINK-6864] Fix confusing "invalid POJO type" mes...

2017-08-22 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4574

[FLINK-6864] Fix confusing "invalid POJO type" messages from TypeExtractor

## What is the purpose of the change

Fix confusing "invalid POJO type" messages from TypeExtractor

## Brief change log

  - *Improve log about pojo in TypeExtractor*
  - *Add notice in types_serialization.cmd about rules-for-pojo-types*

## Verifying this change

This change needs no testing

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6864

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4574.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4574


commit 5b3861716404c1d1cce9f18af7ca596a21ecb014
Author: zjureel 
Date:   2017-08-23T02:30:31Z

[FLINK-6864] Fix confusing "invalid POJO type" messages from TypeExtractor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7268) Zookeeper Checkpoint Store interacting with Incremental State Handles can lead to loss of handles

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137806#comment-16137806
 ] 

ASF GitHub Bot commented on FLINK-7268:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4410#discussion_r134647310
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1044,10 +1049,23 @@ public boolean restoreLatestCheckpointedState(
throw new 
IllegalStateException("CheckpointCoordinator is shut down");
}
 
-   // Recover the checkpoints
-   completedCheckpointStore.recover(sharedStateRegistry);
+   // We create a new shared state registry object, so 
that all pending async disposal requests from previous
+   // runs will go against the old object (were they can 
do no harm).
+   // This must happen under the checkpoint lock.
+   sharedStateRegistry.close();
+   sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
+
+   // Recover the checkpoints, TODO this could be done 
only when there is a new leader, not on each recovery
--- End diff --

If we use highAvailabilityServices.getJobManagerLeaderRetriever(), Job Id 
is required.
Can Job Id be obtained from JobVertexID ?


> Zookeeper Checkpoint Store interacting with Incremental State Handles can 
> lead to loss of handles
> -
>
> Key: FLINK-7268
> URL: https://issues.apache.org/jira/browse/FLINK-7268
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0, 1.3.1, 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
> Attachments: gce_rocks_incr_external_gs-more-logs.txt
>
>
> Release testing for Flink 1.3.2 has shown that this combination of features 
> leads to this errors when using a very low restart delay:
> {code}
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: Item not found: 
> aljoscha/state-machine-checkpoints-2/f26e2b4c6891f2a9e0c5e4ba014733c3/chk-3/b246db8c-4f25-483a-b1fc-234f4319004d
>   at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.getFileNotFoundException(GoogleCloudStorageExceptions.java:42)
>   at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:551)
>   at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:322)
>   at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.(GoogleHadoopFSInputStream.java:121)
>   at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1076)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1281)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1468)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1324)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRe

[GitHub] flink pull request #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegist...

2017-08-22 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4410#discussion_r134647310
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1044,10 +1049,23 @@ public boolean restoreLatestCheckpointedState(
throw new 
IllegalStateException("CheckpointCoordinator is shut down");
}
 
-   // Recover the checkpoints
-   completedCheckpointStore.recover(sharedStateRegistry);
+   // We create a new shared state registry object, so 
that all pending async disposal requests from previous
+   // runs will go against the old object (were they can 
do no harm).
+   // This must happen under the checkpoint lock.
+   sharedStateRegistry.close();
+   sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
+
+   // Recover the checkpoints, TODO this could be done 
only when there is a new leader, not on each recovery
--- End diff --

If we use highAvailabilityServices.getJobManagerLeaderRetriever(), Job Id 
is required.
Can Job Id be obtained from JobVertexID ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-22 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137794#comment-16137794
 ] 

Jark Wu commented on FLINK-7446:


In the long run, we can provide a new interface called {{DefinedWatermark}}, 
which has two methods {{getRowtimeAttribute}} (can only be an existing field) 
and {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
deprecated. In addition, we can provide some built-in watermark generators, 
such as {{AscendingTimestamp}}, {{BoundedOutOfOrderness}}. What do you think ? 

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137761#comment-16137761
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4562
  
@tedyu @NicoK Than you for your suggestions, it sounds good to me, thanks


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4562: [FLINK-7402] Fix ineffective null check in NettyMessage#w...

2017-08-22 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4562
  
@tedyu @NicoK Than you for your suggestions, it sounds good to me, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-08-22 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-7491:
---
Component/s: Table API & SQL

> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages

2017-08-22 Thread ChungChe Lai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChungChe Lai reassigned FLINK-6004:
---

Assignee: ChungChe Lai

> Allow FlinkKinesisConsumer to skip corrupted messages
> -
>
> Key: FLINK-6004
> URL: https://issues.apache.org/jira/browse/FLINK-6004
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: ChungChe Lai
>
> It is quite clear from the fix of FLINK-3679 that in reality, users might 
> encounter corrupted messages from Kafka / Kinesis / generally external 
> sources when deserializing them.
> The consumers should support simply skipping those messages, by letting the 
> deserialization schema return {{null}}, and checking {{null}} values within 
> the consumer.
> This has been done for the Kafka consumer already. This ticket tracks the 
> improvement for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


issues@flink.apache.org

2017-08-22 Thread Jacob Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137601#comment-16137601
 ] 

Jacob Park commented on FLINK-7465:
---

[~sunjincheng121] [~fhueske] May I recommend referring to Apache Spark's JIRA 
ticket for a similar problem?: 
https://issues.apache.org/jira/browse/SPARK-12818. See 
https://github.com/apache/spark/tree/branch-2.2/common/sketch/src/main/java/org/apache/spark/util/sketch
 for examples. Twitter also has great examples of probabilistic and algebraic 
data structures: https://github.com/twitter/algebird.

[~sunjincheng121] If I understand the use-case correctly, you want to implement 
built-in approximation functions for counting distinct elements? I recommend 
not using a Bloom Filter. A Bloom Filter is a great candidate for querying 
set-membership and classification of elements as unique or duplicate. However, 
a Bloom Filter is a poor candidate for approximating the cardinality of 
elements of a set relative to the size of its multiset. 

To me, it sounds non-sensical to provide a maxKeyCount parameter. Use a CMS or 
a HyperLogLog. Please see slide 48 in 
http://lintool.github.io/UMD-courses/bigdata-2015-Spring/slides/session11.pdf. 
These slides are from Jimmy Lin (https://cs.uwaterloo.ca/~jimmylin/index.html); 
he provides a great overview of what probabilistic data structures to use for 
the desired query.

> Add build-in BloomFilterCount on TableAPI&SQL
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-4004) Do not pass custom flink kafka connector properties to Kafka to avoid warnings

2017-08-22 Thread Luffy Tsai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luffy Tsai reassigned FLINK-4004:
-

Assignee: Luffy Tsai

> Do not pass custom flink kafka connector properties to Kafka to avoid warnings
> --
>
> Key: FLINK-4004
> URL: https://issues.apache.org/jira/browse/FLINK-4004
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Luffy Tsai
>
> The FlinkKafkaConsumer has some custom properties, which we pass to the 
> KafkaConsumer as well (such as {{flink.poll-timeout}}). This causes Kafka to 
> log warnings about unused properties.
> We should not pass Flink-internal properties to Kafka, to avoid those 
> warnings.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


issues@flink.apache.org

2017-08-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137425#comment-16137425
 ] 

Fabian Hueske commented on FLINK-7465:
--

I'm sorry, I confused count-min sketches (for approximate group counts) and 
HyperLogLog (for approximate distinct counts). 

I assume the goal of the BloomFilterCount function is to (approximately) count 
the number of distinct values. In contrast to HyperLogLog, Bloom filters are 
not specifically designed for approximate distinct counting but for approximate 
membership testing. AFAIK, bloom filters should be more precise for log 
distinct cardinalities but HyperLogLog should provide much better results for 
larger cardinalities.

IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty 
nice. OTOH, point multiple RocksDB lookups might also be more expensive than a 
single lookup with larger serialization payload (the deserialization logic for 
byte arrays shouldn't be very costy).

> Add build-in BloomFilterCount on TableAPI&SQL
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6465) support FIRST_VALUE on Table API & SQL

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137422#comment-16137422
 ] 

ASF GitHub Bot commented on FLINK-6465:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4556
  
Hi @fhueske @twalthr @wuchong @shaoxuan-wang , I am appreciated if you can 
review the PR.

Thanks, jincheng


> support FIRST_VALUE on Table API & SQL
> --
>
> Key: FLINK-6465
> URL: https://issues.apache.org/jira/browse/FLINK-6465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: sunjincheng
>
> {{FIRST_VALUE}} is a OVER WINDOW function. In this JIRA. will add 
> {{FIRST_VALUE}} function support on TableAPI & SQL.
> *Syntax:*
> FIRST_VALUE ( [scalar_expression ] )   
> OVER ( [ partition_by_clause ] order_by_clause [ rows_range_clause ] )  
> [About OVER 
> WINDOWS|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/tableApi.html#over-windows]
> scalar_expression
> Is the value to be returned. scalar_expression can be a column, or other 
> arbitrary expression that results in a single value. Other analytic functions 
> are not permitted.
> *NOTE:*
> *  {{FIRST_VALUE}} if used for OVER WINDOW, e.g.: 
> {code}
> SELECT A, B, FIRST_VALUE(C) OVER (ORDER BY E) AS firstValue FROM tab
> {code}
> * OVER WINDOW's retraction is expensive(currently not supported yet), and 
> this JIRA. does not implement Retract logic of {{FIRST_VALUE}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4556: [FLINK-6465][table]support FIRST_VALUE on Table API & SQL

2017-08-22 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4556
  
Hi @fhueske @twalthr @wuchong @shaoxuan-wang , I am appreciated if you can 
review the PR.

Thanks, jincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137358#comment-16137358
 ] 

Fabian Hueske commented on FLINK-7446:
--

I agree that the watermark generation for TableSource's should be reworked. 
However, I'm not sure if we will achieve that before the next release. 
If we want to support to define an event-time indicator as an existing field 
and want to be sure that the implementation of the {{TableSource}} is correct, 
we would have to compare the {{StreamRecord}} timestamp with the existing 
attribute at runtime for each record.

However, since {{TableSource}} is an interface for rather experienced users 
(rather DBA than DB user), I'd be fine to omit this safety check. 

In the long run I agree with [~xccui]. We should have a watermark generator 
interface that works on an existing field with built-in implementations for the 
common cases.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7471) Improve bounded OVER support non-retract method AGG

2017-08-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137343#comment-16137343
 ] 

Fabian Hueske commented on FLINK-7471:
--

I understand the motivation for this issue and agree that it would be good to 
support non-retractable UDAGGs for bounded OVER aggregations.
My question is rather what happens if we have a mix of retractable and 
non-retractable aggregation functions? Are the retractable functions 
efficiently computed or not? 

Treating all aggregation functions as non-retractable might introduce a 
significant and unnecessary performance penalty. Think a of a query with 
several retractable aggregations and then adding one non-retractable UDAGG.

Handling retractable and non-retractable aggregation functions separately would 
mean to add quite a bit of additional logic. For example, we would need to 
extend the {{GeneratedAggregations}} interface to account for "empty" fields in 
the output record which would be filled by the other aggregation logic. 



> Improve bounded OVER support non-retract method AGG
> ---
>
> Key: FLINK-7471
> URL: https://issues.apache.org/jira/browse/FLINK-7471
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently BOUNDED OVER WINDOW only support have {{retract}} method AGG. In 
> this JIRA. will add non-retract method support.
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-22 Thread Jacob Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137250#comment-16137250
 ] 

Jacob Park commented on FLINK-7398:
---

+1 for the checkstyle rule.

> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/sca

[jira] [Commented] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.

2017-08-22 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137249#comment-16137249
 ] 

Jamie Grier commented on FLINK-4947:


[~gaborhermann] It's more than just that but yes I do suggest that you should 
be able to override what's in the config file on the command line.

More importantly though is that all *config* should be configurable via 
flink-conf.yaml.  We shouldn't add features that are only configurable from the 
*user code*.

An example of this used to be the RocksDB state backend.  If you wanted to use 
that backend and configure it in "async" mode you had to put this in 
application code, but that's not great for separation of concerns between 
application developers and ops/platform teams.

I know this isn't black-and-white but we should try to clearly separate 
configuration from user code by putting everything in flink-conf.yaml.  We 
should *also* make it possible to override any of those values on the command 
line when submitting a job.



> Make all configuration possible via flink-conf.yaml and CLI.
> 
>
> Key: FLINK-4947
> URL: https://issues.apache.org/jira/browse/FLINK-4947
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jamie Grier
>
> I think it's important to make all configuration possible via the 
> flink-conf.yaml and the command line.
> As an example:  To enable "externalizedCheckpoints" you must actually call 
> the StreamExecutionEnvironment#enableExternalizedCheckpoints() method from 
> your Flink program.
> Another example of this would be configuring the RocksDB state backend.
> I think it important to make deployment flexible and easy to build tools 
> around.  For example, the infrastructure teams that make these configuration 
> decisions and provide tools for deploying Flink apps, will be different from 
> the teams deploying apps.  The team writing apps should not have to set all 
> of this lower level configuration up in their programs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7123) Support timesOrMore in CEP

2017-08-22 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-7123.
-
Resolution: Fixed

Merged with 9995588c83536e04aef3425757a008ba4c78dbde

> Support timesOrMore in CEP
> --
>
> Key: FLINK-7123
> URL: https://issues.apache.org/jira/browse/FLINK-7123
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> The CEP API should provide API such as timesOrMore(#ofTimes) for quantifier 
> {n,}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7123) Support timesOrMore in CEP

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137120#comment-16137120
 ] 

ASF GitHub Bot commented on FLINK-7123:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4523


> Support timesOrMore in CEP
> --
>
> Key: FLINK-7123
> URL: https://issues.apache.org/jira/browse/FLINK-7123
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> The CEP API should provide API such as timesOrMore(#ofTimes) for quantifier 
> {n,}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4523: [FLINK-7123] [cep] Support timesOrMore in CEP

2017-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4523


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7493) Create Pinot Connector

2017-08-22 Thread Zhenqiu Huang (JIRA)
Zhenqiu Huang created FLINK-7493:


 Summary: Create Pinot Connector
 Key: FLINK-7493
 URL: https://issues.apache.org/jira/browse/FLINK-7493
 Project: Flink
  Issue Type: New Feature
Reporter: Zhenqiu Huang
Assignee: Zhenqiu Huang


Add pinot connector for streaming ingestion and batch segment file push.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7492) Memsql Connector

2017-08-22 Thread Zhenqiu Huang (JIRA)
Zhenqiu Huang created FLINK-7492:


 Summary: Memsql Connector
 Key: FLINK-7492
 URL: https://issues.apache.org/jira/browse/FLINK-7492
 Project: Flink
  Issue Type: New Feature
Reporter: Zhenqiu Huang
Assignee: Zhenqiu Huang


Add an output connector for both streaming and batch ingestion for Memsql. 




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-08-22 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-7491:
-

 Summary: Support COLLECT Aggregate function in Flink SQL
 Key: FLINK-7491
 URL: https://issues.apache.org/jira/browse/FLINK-7491
 Project: Flink
  Issue Type: New Feature
Reporter: Shuyi Chen
Assignee: Shuyi Chen






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)

2017-08-22 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137075#comment-16137075
 ] 

Till Rohrmann commented on FLINK-4319:
--

Hi Bowen,

development of Flip-6 is already under way for quite some time and we have 
already made some really good progress. We hope to get the rework done with the 
Flink 1.4 release. What we are currently working on are the REST endpoints, 
reach functional parity with the existing code base and thorough tests of the 
new components. Every helping hand is highly welcome :-)

> Rework Cluster Management (FLIP-6)
> --
>
> Key: FLINK-4319
> URL: https://issues.apache.org/jira/browse/FLINK-4319
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> This is the root issue to track progress of the rework of cluster management 
> (FLIP-6) 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-4319) Rework Cluster Management (FLIP-6)

2017-08-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-4319:


Assignee: Till Rohrmann

> Rework Cluster Management (FLIP-6)
> --
>
> Key: FLINK-4319
> URL: https://issues.apache.org/jira/browse/FLINK-4319
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> This is the root issue to track progress of the rework of cluster management 
> (FLIP-6) 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)

2017-08-22 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137064#comment-16137064
 ] 

Bowen Li commented on FLINK-4319:
-

Hi guys, what's the schedule for this task? When are we starting, and what's 
the target release?

> Rework Cluster Management (FLIP-6)
> --
>
> Key: FLINK-4319
> URL: https://issues.apache.org/jira/browse/FLINK-4319
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>  Labels: flip-6
>
> This is the root issue to track progress of the rework of cluster management 
> (FLIP-6) 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7490) UDF Agg throws Exception when flink-table is loaded with AppClassLoader

2017-08-22 Thread Miguel Rui Pereira Marques (JIRA)
Miguel Rui Pereira Marques created FLINK-7490:
-

 Summary: UDF Agg throws Exception when flink-table is loaded with 
AppClassLoader
 Key: FLINK-7490
 URL: https://issues.apache.org/jira/browse/FLINK-7490
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.1
Reporter: Miguel Rui Pereira Marques


When a UDF aggregation for the Batch Table API is defined in the 
FlinkUserCodeClassLoader and the Table API itself is loaded in the 
AppClassLoader (the jar is included in the lib directory) this exception is 
triggered:

{panel:title=Exception}
java.lang.Exception: The user defined 'open()' method caused an exception: 
Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.aggregate.DataSetAggFunction.compile(DataSetAggFunction.scala:35)
at 
org.apache.flink.table.runtime.aggregate.DataSetAggFunction.open(DataSetAggFunction.scala:49)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
... 3 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 13: 
Cannot determine simple type name "org"
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177)
...
{panel}

Upon inspecting the code I think this may be due to the usage of 
'getClass.getClassLoader' instead of 'getRuntimeContext.getUserCodeClassLoader' 
as an argument 'compile' in the method 'open' of class 
org.apache.flink.table.runtime.aggregate.DataSetAggFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7123) Support timesOrMore in CEP

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137030#comment-16137030
 ] 

ASF GitHub Bot commented on FLINK-7123:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4523
  
LGTM, merging this 


> Support timesOrMore in CEP
> --
>
> Key: FLINK-7123
> URL: https://issues.apache.org/jira/browse/FLINK-7123
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> The CEP API should provide API such as timesOrMore(#ofTimes) for quantifier 
> {n,}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4523: [FLINK-7123] [cep] Support timesOrMore in CEP

2017-08-22 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4523
  
LGTM, merging this 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137006#comment-16137006
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134529419
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * This class links {@link RequestBody}s to {@link ResponseBody}s types 
and contains meta-data required for their http headers.
+ *
+ * Implementations must be state-less.
+ *
+ * @param  request type
+ * @param  response type
+ */
+public interface MessageHeaders {
--- End diff --

Alright, maybe this could be documented as part of the `@param ` 
description.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134529419
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * This class links {@link RequestBody}s to {@link ResponseBody}s types 
and contains meta-data required for their http headers.
+ *
+ * Implementations must be state-less.
+ *
+ * @param  request type
+ * @param  response type
+ */
+public interface MessageHeaders {
--- End diff --

Alright, maybe this could be documented as part of the `@param ` 
description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137002#comment-16137002
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134528391
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used to map query/path {@link Parameter}s to their actual 
value.
+ */
+public abstract class ParameterMapper {
+
+   /**
+* Maps the given query {@link Parameter}s to their actual value.
+*
+* @param queryParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapQueryParameters(Set 
queryParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Maps the given path {@link Parameter}s to their actual value.
+*
+* @param pathParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapPathParameters(Set 
pathParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Resolves the given URL (e.g "jobs/:jobid") using the given 
path/query parameters.
+*
+* @param genericUrl  URL to resolve
+* @param pathParameters  path parameters
+* @param queryParameters query parameters
+* @return resolved url, e.g "/jobs/1234?state=running"
+*/
+   public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) {
+   StringBuilder sb = new StringBuilder(genericUrl);
+
+   pathParameters.forEach((parameter, value) -> {
+   int start = sb.indexOf(":" + parameter.getKey());
+   sb.replace(start, start + parameter.getKey().length() + 
1, value);
+   });
--- End diff --

But we could, for example, count how many placeholders there are in the 
request URL and check that all of them have been replaced, couldn't we? Or is 
it also a valid request URL if a placeholder has not been replaced with a 
concrete value?


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134528391
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used to map query/path {@link Parameter}s to their actual 
value.
+ */
+public abstract class ParameterMapper {
+
+   /**
+* Maps the given query {@link Parameter}s to their actual value.
+*
+* @param queryParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapQueryParameters(Set 
queryParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Maps the given path {@link Parameter}s to their actual value.
+*
+* @param pathParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapPathParameters(Set 
pathParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Resolves the given URL (e.g "jobs/:jobid") using the given 
path/query parameters.
+*
+* @param genericUrl  URL to resolve
+* @param pathParameters  path parameters
+* @param queryParameters query parameters
+* @return resolved url, e.g "/jobs/1234?state=running"
+*/
+   public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) {
+   StringBuilder sb = new StringBuilder(genericUrl);
+
+   pathParameters.forEach((parameter, value) -> {
+   int start = sb.indexOf(":" + parameter.getKey());
+   sb.replace(start, start + parameter.getKey().length() + 
1, value);
+   });
--- End diff --

But we could, for example, count how many placeholders there are in the 
request URL and check that all of them have been replaced, couldn't we? Or is 
it also a valid request URL if a placeholder has not been replaced with a 
concrete value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136978#comment-16136978
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134523787
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used to map query/path {@link Parameter}s to their actual 
value.
+ */
+public abstract class ParameterMapper {
+
+   /**
+* Maps the given query {@link Parameter}s to their actual value.
+*
+* @param queryParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapQueryParameters(Set 
queryParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Maps the given path {@link Parameter}s to their actual value.
+*
+* @param pathParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapPathParameters(Set 
pathParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Resolves the given URL (e.g "jobs/:jobid") using the given 
path/query parameters.
+*
+* @param genericUrl  URL to resolve
+* @param pathParameters  path parameters
+* @param queryParameters query parameters
+* @return resolved url, e.g "/jobs/1234?state=running"
+*/
+   public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) {
+   StringBuilder sb = new StringBuilder(genericUrl);
+
+   pathParameters.forEach((parameter, value) -> {
+   int start = sb.indexOf(":" + parameter.getKey());
+   sb.replace(start, start + parameter.getKey().length() + 
1, value);
+   });
--- End diff --

we can't check that in this static method since we neither know the 
original parameters nor can search for leftover parameters (since `:` is a 
valid character for a url). The only place where we could do it is in the 
client, but it wouldn't be able to tell whether the replace parts or correct 
anyway, so there might not be much value in checking.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134523787
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used to map query/path {@link Parameter}s to their actual 
value.
+ */
+public abstract class ParameterMapper {
+
+   /**
+* Maps the given query {@link Parameter}s to their actual value.
+*
+* @param queryParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapQueryParameters(Set 
queryParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Maps the given path {@link Parameter}s to their actual value.
+*
+* @param pathParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapPathParameters(Set 
pathParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Resolves the given URL (e.g "jobs/:jobid") using the given 
path/query parameters.
+*
+* @param genericUrl  URL to resolve
+* @param pathParameters  path parameters
+* @param queryParameters query parameters
+* @return resolved url, e.g "/jobs/1234?state=running"
+*/
+   public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) {
+   StringBuilder sb = new StringBuilder(genericUrl);
+
+   pathParameters.forEach((parameter, value) -> {
+   int start = sb.indexOf(":" + parameter.getKey());
+   sb.replace(start, start + parameter.getKey().length() + 
1, value);
+   });
--- End diff --

we can't check that in this static method since we neither know the 
original parameters nor can search for leftover parameters (since `:` is a 
valid character for a url). The only place where we could do it is in the 
client, but it wouldn't be able to tell whether the replace parts or correct 
anyway, so there might not be much value in checking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6378) Implement FLIP-6 Flink-on-Mesos

2017-08-22 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  reassigned FLINK-6378:
---

Assignee: Eron Wright 

> Implement FLIP-6 Flink-on-Mesos
> ---
>
> Key: FLINK-6378
> URL: https://issues.apache.org/jira/browse/FLINK-6378
> Project: Flink
>  Issue Type: New Feature
>  Components: Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>  Labels: flip-6
>
> This is the parent issue for implementing Flink on Mesos using the new FLIP-6 
> architecture.
> This covers individual jobs running as Mesos frameworks, where the framework 
> and job lifetime are coupled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136944#comment-16136944
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134517671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

Yes it would require casting the response. I think this is more explicit 
and prevents the user from shooting himself in the foot.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134517671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

Yes it would require casting the response. I think this is more explicit 
and prevents the user from shooting himself in the foot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136943#comment-16136943
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134517389
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ParameterMapper;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestClientEndpoint.class);
+
+   private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final String configuredTargetAddress;
+   private final int configuredTargetPort;
+   private final SSLEngine sslEngine;
+
+   private Bootstrap bootstrap;
+
+   private final Client

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134517389
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ParameterMapper;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestClientEndpoint.class);
+
+   private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final String configuredTargetAddress;
+   private final int configuredTargetPort;
+   private final SSLEngine sslEngine;
+
+   private Bootstrap bootstrap;
+
+   private final ClientHandler handler = new ClientHandler();
+
+   private CompletableFuture lastFuture = 
CompletableFuture.completedFuture(null);
+
+   private final Executor directExecutor = Executors.directExecutor();
+
+   public RestClientEndpoint(

[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136925#comment-16136925
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134512942
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * This class links {@link RequestBody}s to {@link ResponseBody}s types 
and contains meta-data required for their http headers.
+ *
+ * Implementations must be state-less.
+ *
+ * @param  request type
+ * @param  response type
+ */
+public interface MessageHeaders {
--- End diff --

The idea is to have the headers define which parameter mapper is used, to 
prevent some unrelated mapper to be passed in.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134512942
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * This class links {@link RequestBody}s to {@link ResponseBody}s types 
and contains meta-data required for their http headers.
+ *
+ * Implementations must be state-less.
+ *
+ * @param  request type
+ * @param  response type
+ */
+public interface MessageHeaders {
--- End diff --

The idea is to have the headers define which parameter mapper is used, to 
prevent some unrelated mapper to be passed in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136922#comment-16136922
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134512163
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ParameterMapper;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestClientEndpoint.class);
+
+   private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final String configuredTargetAddress;
+   private final int configuredTargetPort;
+   private final SSLEngine sslEngine;
+
+   private Bootstrap bootstrap;
+
+   private final ClientHandle

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134512163
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ParameterMapper;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestClientEndpoint.class);
+
+   private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final String configuredTargetAddress;
+   private final int configuredTargetPort;
+   private final SSLEngine sslEngine;
+
+   private Bootstrap bootstrap;
+
+   private final ClientHandler handler = new ClientHandler();
+
+   private CompletableFuture lastFuture = 
CompletableFuture.completedFuture(null);
+
+   private final Executor directExecutor = Executors.directExecutor();
+
+   public RestClientEndpoint(RestCl

[jira] [Updated] (FLINK-4048) Remove Hadoop dependencies from ExecutionEnvironment

2017-08-22 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4048:

Issue Type: Bug  (was: Sub-task)
Parent: (was: FLINK-3957)

> Remove Hadoop dependencies from ExecutionEnvironment
> 
>
> Key: FLINK-4048
> URL: https://issues.apache.org/jira/browse/FLINK-4048
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-4048) Remove Hadoop dependencies from ExecutionEnvironment

2017-08-22 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-4048:
---

Assignee: Aljoscha Krettek

> Remove Hadoop dependencies from ExecutionEnvironment
> 
>
> Key: FLINK-4048
> URL: https://issues.apache.org/jira/browse/FLINK-4048
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4048) Remove Hadoop dependencies from ExecutionEnvironment

2017-08-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136915#comment-16136915
 ] 

Aljoscha Krettek commented on FLINK-4048:
-

I was experimenting with this as part of FLINK-2268 and noticed that the 
Hadoop-related methods on {{ExecutionEnvironment}} are {{@PublicEvolving}} so I 
wen't ahead and removed them. Seems to work smoothly so far.

> Remove Hadoop dependencies from ExecutionEnvironment
> 
>
> Key: FLINK-4048
> URL: https://issues.apache.org/jira/browse/FLINK-4048
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API
>Reporter: Robert Metzger
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136912#comment-16136912
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134510178
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

But even if we split this interface a user that gets a `HandlerResponse` 
still has to do the same checks as to whether it was successful or not. Unless 
you suggest to have an empty `HandlerResponse` interface and force users to do 
an instanceof+cast every time.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-2268) Provide Flink binary release without Hadoop

2017-08-22 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-2268:
---

Assignee: Aljoscha Krettek

> Provide Flink binary release without Hadoop
> ---
>
> Key: FLINK-2268
> URL: https://issues.apache.org/jira/browse/FLINK-2268
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>
> Currently, all Flink releases ship with Hadoop 2.3.0 binaries.
> The big Hadoop distributions are usually not relying on vanilla Hadoop 
> releases, but on custom patched versions.
> To provide the best user experience, we should offer a Flink binary that uses 
> the Hadoop jars provided by the user (=hadoop distribution)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134510178
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

But even if we split this interface a user that gets a `HandlerResponse` 
still has to do the same checks as to whether it was successful or not. Unless 
you suggest to have an empty `HandlerResponse` interface and force users to do 
an instanceof+cast every time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136899#comment-16136899
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134507274
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -155,38 +129,66 @@ public void shutdown() {
.set(HttpHeaders.Names.HOST, configuredTargetAddress + 
":" + configuredTargetPort)
.set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
 
-   synchronized (this) {
-   // This ensures strict sequential processing of 
requests.
-   // If we send new requests immediately we can no longer 
make assumptions about the order in which responses
-   // arrive, due to which the handler cannot know which 
future he should complete (not to mention what response
-   // type to read).
-   CompletableFuture nextFuture = lastFuture
-   .handleAsync((f, e) -> 
submitRequest(httpRequest, messageHeaders), directExecutor)
-   .thenCompose((future) -> future);
-
-   lastFuture = nextFuture;
-   return nextFuture;
-   }
+   return submitRequest(httpRequest, messageHeaders);
}
 
private , U extends ParameterMapper, 
R extends RequestBody, P extends ResponseBody> CompletableFuture 
submitRequest(FullHttpRequest httpRequest, M messageHeaders) {
-   CompletableFuture responseFuture = 
handler.expectResponse(messageHeaders.getResponseClass());
-
-   try {
-   // write request
-   Channel channel = 
bootstrap.connect(configuredTargetAddress, 
configuredTargetPort).sync().channel();
-   channel.writeAndFlush(httpRequest);
-   channel.closeFuture();
-   } catch (InterruptedException e) {
-   return FutureUtils.completedExceptionally(e);
+   synchronized (lock) {
+   CompletableFuture responseFuture = 
ClientHandler.addHandlerForResponse(bootstrap, sslEngine, 
messageHeaders.getResponseClass());
+
+   try {
+   // write request
+   Channel channel = 
bootstrap.connect(configuredTargetAddress, 
configuredTargetPort).sync().channel();
+   channel.writeAndFlush(httpRequest);
+   channel.closeFuture();
+   } catch (InterruptedException e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+   return responseFuture;
}
-   return responseFuture;
}
 
-   @ChannelHandler.Sharable
-   private static class ClientHandler extends 
SimpleChannelInboundHandler {
+   private static class RestChannelInitializer extends 
ChannelInitializer {
 
-   private volatile ExpectedResponse 
expectedResponse = null;
+   private final SSLEngine sslEngine;
+   private final ClientHandler handler;
+
+   public RestChannelInitializer(SSLEngine sslEngine, 
ClientHandler handler) {
+   this.sslEngine = sslEngine;
+   this.handler = handler;
+   }
+
+   @Override
+   protected void initChannel(SocketChannel ch) throws Exception {
+   // SSL should be the first handler in the pipeline
+   if (sslEngine != null) {
+   ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+   }
+
+   ch.pipeline()
+   .addLast(new HttpClientCodec())
+   .addLast(new HttpObjectAggregator(1024 * 1024))
+   .addLast(handler)
+   .addLast(new PipelineErrorHandler(LOG));
+   }
+   }
+
+   private static class ClientHandler extends 
SimpleChannelInboundHandler {
+
+   private final ExpectedResponse expectedResponse;
+
+   private ClientHandler(ExpectedResponse expectedResponse) {
+   this.expectedResponse = expectedResponse;
+   }
+
+   static  CompletableFuture 
addHandlerForResponse(Bootstrap bootStrap, SSLEngine sslEngine, Class 
expectedResponse) {
+   CompletableFuture responseFuture = new 
CompletableFuture<>();
  

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134507274
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -155,38 +129,66 @@ public void shutdown() {
.set(HttpHeaders.Names.HOST, configuredTargetAddress + 
":" + configuredTargetPort)
.set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
 
-   synchronized (this) {
-   // This ensures strict sequential processing of 
requests.
-   // If we send new requests immediately we can no longer 
make assumptions about the order in which responses
-   // arrive, due to which the handler cannot know which 
future he should complete (not to mention what response
-   // type to read).
-   CompletableFuture nextFuture = lastFuture
-   .handleAsync((f, e) -> 
submitRequest(httpRequest, messageHeaders), directExecutor)
-   .thenCompose((future) -> future);
-
-   lastFuture = nextFuture;
-   return nextFuture;
-   }
+   return submitRequest(httpRequest, messageHeaders);
}
 
private , U extends ParameterMapper, 
R extends RequestBody, P extends ResponseBody> CompletableFuture 
submitRequest(FullHttpRequest httpRequest, M messageHeaders) {
-   CompletableFuture responseFuture = 
handler.expectResponse(messageHeaders.getResponseClass());
-
-   try {
-   // write request
-   Channel channel = 
bootstrap.connect(configuredTargetAddress, 
configuredTargetPort).sync().channel();
-   channel.writeAndFlush(httpRequest);
-   channel.closeFuture();
-   } catch (InterruptedException e) {
-   return FutureUtils.completedExceptionally(e);
+   synchronized (lock) {
+   CompletableFuture responseFuture = 
ClientHandler.addHandlerForResponse(bootstrap, sslEngine, 
messageHeaders.getResponseClass());
+
+   try {
+   // write request
+   Channel channel = 
bootstrap.connect(configuredTargetAddress, 
configuredTargetPort).sync().channel();
+   channel.writeAndFlush(httpRequest);
+   channel.closeFuture();
+   } catch (InterruptedException e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+   return responseFuture;
}
-   return responseFuture;
}
 
-   @ChannelHandler.Sharable
-   private static class ClientHandler extends 
SimpleChannelInboundHandler {
+   private static class RestChannelInitializer extends 
ChannelInitializer {
 
-   private volatile ExpectedResponse 
expectedResponse = null;
+   private final SSLEngine sslEngine;
+   private final ClientHandler handler;
+
+   public RestChannelInitializer(SSLEngine sslEngine, 
ClientHandler handler) {
+   this.sslEngine = sslEngine;
+   this.handler = handler;
+   }
+
+   @Override
+   protected void initChannel(SocketChannel ch) throws Exception {
+   // SSL should be the first handler in the pipeline
+   if (sslEngine != null) {
+   ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+   }
+
+   ch.pipeline()
+   .addLast(new HttpClientCodec())
+   .addLast(new HttpObjectAggregator(1024 * 1024))
+   .addLast(handler)
+   .addLast(new PipelineErrorHandler(LOG));
+   }
+   }
+
+   private static class ClientHandler extends 
SimpleChannelInboundHandler {
+
+   private final ExpectedResponse expectedResponse;
+
+   private ClientHandler(ExpectedResponse expectedResponse) {
+   this.expectedResponse = expectedResponse;
+   }
+
+   static  CompletableFuture 
addHandlerForResponse(Bootstrap bootStrap, SSLEngine sslEngine, Class 
expectedResponse) {
+   CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+   ClientHandler handler = new ClientHandler<>(new 
ExpectedResponse<>(expectedResponse, responseFuture));
+   bootStrap.handler(new RestChannelInitializer(sslEngine, 
handler));
+
+   re

[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136898#comment-16136898
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134507146
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

See the checks when accessing the different fields. This can easily throw 
an exception if the user did not properly check what's the type of the response.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134507146
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

See the checks when accessing the different fields. This can easily throw 
an exception if the user did not properly check what's the type of the response.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136896#comment-16136896
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134506551
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ParameterMapper;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestClientEndpoint.class);
+
+   private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final String configuredTargetAddress;
+   private final int configuredTargetPort;
+   private final SSLEngine sslEngine;
+
+   private Bootstrap bootstrap;
+
+   private final Client

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134506551
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ParameterMapper;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestClientEndpoint.class);
+
+   private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final String configuredTargetAddress;
+   private final int configuredTargetPort;
+   private final SSLEngine sslEngine;
+
+   private Bootstrap bootstrap;
+
+   private final ClientHandler handler = new ClientHandler();
+
+   private CompletableFuture lastFuture = 
CompletableFuture.completedFuture(null);
+
+   private final Executor directExecutor = Executors.directExecutor();
+
+   public RestClientEndpoint(

[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136890#comment-16136890
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134505729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -155,38 +129,66 @@ public void shutdown() {
.set(HttpHeaders.Names.HOST, configuredTargetAddress + 
":" + configuredTargetPort)
.set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
 
-   synchronized (this) {
-   // This ensures strict sequential processing of 
requests.
-   // If we send new requests immediately we can no longer 
make assumptions about the order in which responses
-   // arrive, due to which the handler cannot know which 
future he should complete (not to mention what response
-   // type to read).
-   CompletableFuture nextFuture = lastFuture
-   .handleAsync((f, e) -> 
submitRequest(httpRequest, messageHeaders), directExecutor)
-   .thenCompose((future) -> future);
-
-   lastFuture = nextFuture;
-   return nextFuture;
-   }
+   return submitRequest(httpRequest, messageHeaders);
}
 
private , U extends ParameterMapper, 
R extends RequestBody, P extends ResponseBody> CompletableFuture 
submitRequest(FullHttpRequest httpRequest, M messageHeaders) {
-   CompletableFuture responseFuture = 
handler.expectResponse(messageHeaders.getResponseClass());
-
-   try {
-   // write request
-   Channel channel = 
bootstrap.connect(configuredTargetAddress, 
configuredTargetPort).sync().channel();
-   channel.writeAndFlush(httpRequest);
-   channel.closeFuture();
-   } catch (InterruptedException e) {
-   return FutureUtils.completedExceptionally(e);
+   synchronized (lock) {
+   CompletableFuture responseFuture = 
ClientHandler.addHandlerForResponse(bootstrap, sslEngine, 
messageHeaders.getResponseClass());
+
+   try {
+   // write request
+   Channel channel = 
bootstrap.connect(configuredTargetAddress, 
configuredTargetPort).sync().channel();
+   channel.writeAndFlush(httpRequest);
+   channel.closeFuture();
+   } catch (InterruptedException e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+   return responseFuture;
}
-   return responseFuture;
}
 
-   @ChannelHandler.Sharable
-   private static class ClientHandler extends 
SimpleChannelInboundHandler {
+   private static class RestChannelInitializer extends 
ChannelInitializer {
 
-   private volatile ExpectedResponse 
expectedResponse = null;
+   private final SSLEngine sslEngine;
+   private final ClientHandler handler;
+
+   public RestChannelInitializer(SSLEngine sslEngine, 
ClientHandler handler) {
+   this.sslEngine = sslEngine;
+   this.handler = handler;
+   }
+
+   @Override
+   protected void initChannel(SocketChannel ch) throws Exception {
+   // SSL should be the first handler in the pipeline
+   if (sslEngine != null) {
+   ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+   }
+
+   ch.pipeline()
+   .addLast(new HttpClientCodec())
+   .addLast(new HttpObjectAggregator(1024 * 1024))
+   .addLast(handler)
+   .addLast(new PipelineErrorHandler(LOG));
+   }
+   }
+
+   private static class ClientHandler extends 
SimpleChannelInboundHandler {
+
+   private final ExpectedResponse expectedResponse;
+
+   private ClientHandler(ExpectedResponse expectedResponse) {
+   this.expectedResponse = expectedResponse;
+   }
+
+   static  CompletableFuture 
addHandlerForResponse(Bootstrap bootStrap, SSLEngine sslEngine, Class 
expectedResponse) {
+   CompletableFuture responseFuture = new 
CompletableFuture<>();
+
  

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134505729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -155,38 +129,66 @@ public void shutdown() {
.set(HttpHeaders.Names.HOST, configuredTargetAddress + 
":" + configuredTargetPort)
.set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
 
-   synchronized (this) {
-   // This ensures strict sequential processing of 
requests.
-   // If we send new requests immediately we can no longer 
make assumptions about the order in which responses
-   // arrive, due to which the handler cannot know which 
future he should complete (not to mention what response
-   // type to read).
-   CompletableFuture nextFuture = lastFuture
-   .handleAsync((f, e) -> 
submitRequest(httpRequest, messageHeaders), directExecutor)
-   .thenCompose((future) -> future);
-
-   lastFuture = nextFuture;
-   return nextFuture;
-   }
+   return submitRequest(httpRequest, messageHeaders);
}
 
private , U extends ParameterMapper, 
R extends RequestBody, P extends ResponseBody> CompletableFuture 
submitRequest(FullHttpRequest httpRequest, M messageHeaders) {
-   CompletableFuture responseFuture = 
handler.expectResponse(messageHeaders.getResponseClass());
-
-   try {
-   // write request
-   Channel channel = 
bootstrap.connect(configuredTargetAddress, 
configuredTargetPort).sync().channel();
-   channel.writeAndFlush(httpRequest);
-   channel.closeFuture();
-   } catch (InterruptedException e) {
-   return FutureUtils.completedExceptionally(e);
+   synchronized (lock) {
+   CompletableFuture responseFuture = 
ClientHandler.addHandlerForResponse(bootstrap, sslEngine, 
messageHeaders.getResponseClass());
+
+   try {
+   // write request
+   Channel channel = 
bootstrap.connect(configuredTargetAddress, 
configuredTargetPort).sync().channel();
+   channel.writeAndFlush(httpRequest);
+   channel.closeFuture();
+   } catch (InterruptedException e) {
+   return FutureUtils.completedExceptionally(e);
+   }
+   return responseFuture;
}
-   return responseFuture;
}
 
-   @ChannelHandler.Sharable
-   private static class ClientHandler extends 
SimpleChannelInboundHandler {
+   private static class RestChannelInitializer extends 
ChannelInitializer {
 
-   private volatile ExpectedResponse 
expectedResponse = null;
+   private final SSLEngine sslEngine;
+   private final ClientHandler handler;
+
+   public RestChannelInitializer(SSLEngine sslEngine, 
ClientHandler handler) {
+   this.sslEngine = sslEngine;
+   this.handler = handler;
+   }
+
+   @Override
+   protected void initChannel(SocketChannel ch) throws Exception {
+   // SSL should be the first handler in the pipeline
+   if (sslEngine != null) {
+   ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+   }
+
+   ch.pipeline()
+   .addLast(new HttpClientCodec())
+   .addLast(new HttpObjectAggregator(1024 * 1024))
+   .addLast(handler)
+   .addLast(new PipelineErrorHandler(LOG));
+   }
+   }
+
+   private static class ClientHandler extends 
SimpleChannelInboundHandler {
+
+   private final ExpectedResponse expectedResponse;
+
+   private ClientHandler(ExpectedResponse expectedResponse) {
+   this.expectedResponse = expectedResponse;
+   }
+
+   static  CompletableFuture 
addHandlerForResponse(Bootstrap bootStrap, SSLEngine sslEngine, Class 
expectedResponse) {
+   CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+   ClientHandler handler = new ClientHandler<>(new 
ExpectedResponse<>(expectedResponse, responseFuture));
+   bootStrap.handler(new RestChannelInitializer(sslEngine, 
handler));
+
+   return r

[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136888#comment-16136888
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134505357
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ParameterMapper;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestClientEndpoint.class);
+
+   private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final String configuredTargetAddress;
+   private final int configuredTargetPort;
+   private final SSLEngine sslEngine;
+
+   private Bootstrap bootstrap;
+
+   private final ClientHandle

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134505357
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
 ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ParameterMapper;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RestClientEndpoint.class);
+
+   private static final ObjectMapper objectMapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final String configuredTargetAddress;
+   private final int configuredTargetPort;
+   private final SSLEngine sslEngine;
+
+   private Bootstrap bootstrap;
+
+   private final ClientHandler handler = new ClientHandler();
+
+   private CompletableFuture lastFuture = 
CompletableFuture.completedFuture(null);
+
+   private final Executor directExecutor = Executors.directExecutor();
+
+   public RestClientEndpoint(RestCl

[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136873#comment-16136873
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134502397
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

I can change that, but I don't see how this is superior regarding type 
safety.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134502397
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

I can change that, but I don't see how this is superior regarding type 
safety.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7489) Remove job lifecycle methods from public JobMasterGateway interface

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136872#comment-16136872
 ] 

ASF GitHub Bot commented on FLINK-7489:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4573

[FLINK-7489] Remove startJobExecution and suspendExecution from 
JobMasterGateway

## What is the purpose of the change

The job lifecycle methods `startJobExecution` and `suspendExecution` should 
not be exposed as RPCs. Therefore, this commit removes them from the 
JobMasterGateway definition.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink removeJMRPCs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4573.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4573


commit 4b1088243a24fc3791c56eff37e1ca5fada8afbd
Author: Till Rohrmann 
Date:   2017-08-22T14:33:05Z

[FLINK-7489] Remove startJobExecution and suspendExecution from 
JobMasterGateway

The job lifecycle methods should not be exposed as RPCs. Therefore, this 
commit
removes them from the JobMasterGateway definition.




> Remove job lifecycle methods from public JobMasterGateway interface
> ---
>
> Key: FLINK-7489
> URL: https://issues.apache.org/jira/browse/FLINK-7489
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{JobMaster}} exposes its job lifecycle methods {{startJobExecution}} and 
> {{suspendExecution}} via its {{JobMasterGateway}} to the public. I think 
> these methods should not be exposed because it allows other components to 
> affect the job execution without proper reason. Only the {{JobManagerRunner}} 
> should be responsible for calling these methods and has direct access to the 
> {{JobMaster}} instance. Therefore, these methods can directly be implemented 
> by the {{JobMaster}} without exposing them via the {{JobMasterGateway}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4573: [FLINK-7489] Remove startJobExecution and suspendE...

2017-08-22 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4573

[FLINK-7489] Remove startJobExecution and suspendExecution from 
JobMasterGateway

## What is the purpose of the change

The job lifecycle methods `startJobExecution` and `suspendExecution` should 
not be exposed as RPCs. Therefore, this commit removes them from the 
JobMasterGateway definition.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink removeJMRPCs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4573.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4573


commit 4b1088243a24fc3791c56eff37e1ca5fada8afbd
Author: Till Rohrmann 
Date:   2017-08-22T14:33:05Z

[FLINK-7489] Remove startJobExecution and suspendExecution from 
JobMasterGateway

The job lifecycle methods should not be exposed as RPCs. Therefore, this 
commit
removes them from the JobMasterGateway definition.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7489) Remove job lifecycle methods from public JobMasterGateway interface

2017-08-22 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7489:


 Summary: Remove job lifecycle methods from public JobMasterGateway 
interface
 Key: FLINK-7489
 URL: https://issues.apache.org/jira/browse/FLINK-7489
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


The {{JobMaster}} exposes its job lifecycle methods {{startJobExecution}} and 
{{suspendExecution}} via its {{JobMasterGateway}} to the public. I think these 
methods should not be exposed because it allows other components to affect the 
job execution without proper reason. Only the {{JobManagerRunner}} should be 
responsible for calling these methods and has direct access to the 
{{JobMaster}} instance. Therefore, these methods can directly be implemented by 
the {{JobMaster}} without exposing them via the {{JobMasterGateway}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136868#comment-16136868
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134499984
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+   static final String FIELD_NAME_ERRORS = "errors";
+
+   @JsonProperty(FIELD_NAME_ERRORS)
+   public final List errors;
--- End diff --

Alright, then leave it like it is.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134499984
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+   static final String FIELD_NAME_ERRORS = "errors";
+
+   @JsonProperty(FIELD_NAME_ERRORS)
+   public final List errors;
--- End diff --

Alright, then leave it like it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136865#comment-16136865
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134499043
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+   static final String FIELD_NAME_ERRORS = "errors";
+
+   @JsonProperty(FIELD_NAME_ERRORS)
+   public final List errors;
--- End diff --

it's part of the API, so we either add it now or break it later.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134499043
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+   static final String FIELD_NAME_ERRORS = "errors";
+
+   @JsonProperty(FIELD_NAME_ERRORS)
+   public final List errors;
--- End diff --

it's part of the API, so we either add it now or break it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136853#comment-16136853
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134494673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+   static final String FIELD_NAME_ERRORS = "errors";
+
+   @JsonProperty(FIELD_NAME_ERRORS)
+   public final List errors;
--- End diff --

But it's not yet used and somewhat misleading if it was initialized with a 
singleton list before. I suggest to remove it until needed.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134494673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+   static final String FIELD_NAME_ERRORS = "errors";
+
+   @JsonProperty(FIELD_NAME_ERRORS)
+   public final List errors;
--- End diff --

But it's not yet used and somewhat misleading if it was initialized with a 
singleton list before. I suggest to remove it until needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136817#comment-16136817
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134485505
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+   static final String FIELD_NAME_ERRORS = "errors";
+
+   @JsonProperty(FIELD_NAME_ERRORS)
+   public final List errors;
--- End diff --

The list constructor isn't meant to be private. The idea is to allow us to 
provide multiple error messages, which can be useful if a handler does multiple 
actions in parallel and multiple fail.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134485505
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+   static final String FIELD_NAME_ERRORS = "errors";
+
+   @JsonProperty(FIELD_NAME_ERRORS)
+   public final List errors;
--- End diff --

The list constructor isn't meant to be private. The idea is to allow us to 
provide multiple error messages, which can be useful if a handler does multiple 
actions in parallel and multiple fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136772#comment-16136772
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134474895
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Super class for netty-based handlers that work with {@link 
RequestBody}s and {@link ResponseBody}s.
+ *
+ * Subclasses must be thread-safe.
+ *
+ * @param  type of incoming requests
+ * @param  type of outgoing responses
+ */
+@ChannelHandler.Sharable
+public abstract class AbstractRestHandler extends SimpleChannelInboundHandler {
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   private static final ObjectMapper mapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final MessageHeaders messageHeaders;
+
+   protected AbstractRestHandler(MessageHeaders messageHeaders) {
+   this.messageHeaders = messageHeaders;
+   }
+
+   public MessageHeaders getMessageHeaders() {
+   return messageHeaders;
+   }
+
+   @Override
+   protected void ch

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134474815
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestServerEndpoint}s.
+ */
+public final class RestServerEndpointConfiguration {
+
+   private final String restBindAddress;
+   private final int restBindPort;
+   private final SSLEngine sslEngine;
+
+   private RestServerEndpointConfiguration(String restBindAddress, int 
targetRestEndpointPort, SSLEngine sslEngine) {
+   this.restBindAddress = restBindAddress;
+   this.restBindPort = targetRestEndpointPort;
+   this.sslEngine = sslEngine;
--- End diff --

Then please annotate them with `@Nullable`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134474895
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Super class for netty-based handlers that work with {@link 
RequestBody}s and {@link ResponseBody}s.
+ *
+ * Subclasses must be thread-safe.
+ *
+ * @param  type of incoming requests
+ * @param  type of outgoing responses
+ */
+@ChannelHandler.Sharable
+public abstract class AbstractRestHandler extends SimpleChannelInboundHandler {
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   private static final ObjectMapper mapper = 
RestMapperUtils.getStrictObjectMapper();
+
+   private final MessageHeaders messageHeaders;
+
+   protected AbstractRestHandler(MessageHeaders messageHeaders) {
+   this.messageHeaders = messageHeaders;
+   }
+
+   public MessageHeaders getMessageHeaders() {
+   return messageHeaders;
+   }
+
+   @Override
+   protected void channelRead0(final ChannelHandlerContext ctx, Routed 
routed) throws Exception {
+   log.debug("Received request.");
+   final HttpRequest httpRequest = routed.request();
+
+   try {
+   if (!(httpR

[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136771#comment-16136771
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134474815
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestServerEndpoint}s.
+ */
+public final class RestServerEndpointConfiguration {
+
+   private final String restBindAddress;
+   private final int restBindPort;
+   private final SSLEngine sslEngine;
+
+   private RestServerEndpointConfiguration(String restBindAddress, int 
targetRestEndpointPort, SSLEngine sslEngine) {
+   this.restBindAddress = restBindAddress;
+   this.restBindPort = targetRestEndpointPort;
+   this.sslEngine = sslEngine;
--- End diff --

Then please annotate them with `@Nullable`


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture

2017-08-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-5851.

Resolution: Fixed

Fixed via 40cec17f4303b43bbf65d8be542f0646eada57e8

> Renaming AsyncCollector into ResultPromise/ResultFuture
> ---
>
> Key: FLINK-5851
> URL: https://issues.apache.org/jira/browse/FLINK-5851
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>  Labels: api-breaking
> Fix For: 1.4.0
>
>
> Currently, the async I/O API gives an {{AsyncCollector}} to an 
> {{AsyncFunction}} implementation. The name does not really reflect what the 
> {{AsyncCollector}} does since it does not collect but is actually a one time 
> completable future. Therefore, I propose to rename the {{AsyncCollector}} 
> into {{ResultPromise}} or {{ResultFuture}}. This is API changing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture

2017-08-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5851:
-
Labels: api-breaking  (was: )

> Renaming AsyncCollector into ResultPromise/ResultFuture
> ---
>
> Key: FLINK-5851
> URL: https://issues.apache.org/jira/browse/FLINK-5851
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>  Labels: api-breaking
> Fix For: 1.4.0
>
>
> Currently, the async I/O API gives an {{AsyncCollector}} to an 
> {{AsyncFunction}} implementation. The name does not really reflect what the 
> {{AsyncCollector}} does since it does not collect but is actually a one time 
> completable future. Therefore, I propose to rename the {{AsyncCollector}} 
> into {{ResultPromise}} or {{ResultFuture}}. This is API changing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture

2017-08-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-5851:
--

> Renaming AsyncCollector into ResultPromise/ResultFuture
> ---
>
> Key: FLINK-5851
> URL: https://issues.apache.org/jira/browse/FLINK-5851
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>  Labels: api-breaking
> Fix For: 1.4.0
>
>
> Currently, the async I/O API gives an {{AsyncCollector}} to an 
> {{AsyncFunction}} implementation. The name does not really reflect what the 
> {{AsyncCollector}} does since it does not collect but is actually a one time 
> completable future. Therefore, I propose to rename the {{AsyncCollector}} 
> into {{ResultPromise}} or {{ResultFuture}}. This is API changing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture

2017-08-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-5851.

  Resolution: Fixed
Release Note: This change is breaking the API of the AsyncFunction which is 
now called with a {{ResultFuture}} instead of a {{AsyncCollector}}. In order to 
complete the future with the result one has to call {{ResultFuture#complete}} 
or {{ResultFuture#completeExceptionally}} in case of an exceptional completion.

Fixed via 40cec17f4303b43bbf65d8be542f0646eada57e8

> Renaming AsyncCollector into ResultPromise/ResultFuture
> ---
>
> Key: FLINK-5851
> URL: https://issues.apache.org/jira/browse/FLINK-5851
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
> Fix For: 1.4.0
>
>
> Currently, the async I/O API gives an {{AsyncCollector}} to an 
> {{AsyncFunction}} implementation. The name does not really reflect what the 
> {{AsyncCollector}} does since it does not collect but is actually a one time 
> completable future. Therefore, I propose to rename the {{AsyncCollector}} 
> into {{ResultPromise}} or {{ResultFuture}}. This is API changing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136764#comment-16136764
 ] 

ASF GitHub Bot commented on FLINK-5851:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4243


> Renaming AsyncCollector into ResultPromise/ResultFuture
> ---
>
> Key: FLINK-5851
> URL: https://issues.apache.org/jira/browse/FLINK-5851
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
> Fix For: 1.4.0
>
>
> Currently, the async I/O API gives an {{AsyncCollector}} to an 
> {{AsyncFunction}} implementation. The name does not really reflect what the 
> {{AsyncCollector}} does since it does not collect but is actually a one time 
> completable future. Therefore, I propose to rename the {{AsyncCollector}} 
> into {{ResultPromise}} or {{ResultFuture}}. This is API changing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4243: [FLINK-5851] [streaming API] Rename AsyncCollector...

2017-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4243


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture

2017-08-22 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-5851:


Assignee: mingleizhang

> Renaming AsyncCollector into ResultPromise/ResultFuture
> ---
>
> Key: FLINK-5851
> URL: https://issues.apache.org/jira/browse/FLINK-5851
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
> Fix For: 1.4.0
>
>
> Currently, the async I/O API gives an {{AsyncCollector}} to an 
> {{AsyncFunction}} implementation. The name does not really reflect what the 
> {{AsyncCollector}} does since it does not collect but is actually a one time 
> completable future. Therefore, I propose to rename the {{AsyncCollector}} 
> into {{ResultPromise}} or {{ResultFuture}}. This is API changing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >