[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output
[ https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137972#comment-16137972 ] ASF GitHub Bot commented on FLINK-6244: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4320 > Emit timeouted Patterns as Side Output > -- > > Key: FLINK-6244 > URL: https://issues.apache.org/jira/browse/FLINK-6244 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.3.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > Fix For: 1.4.0 > > > Now that we have SideOuputs I think timeouted patterns should be emitted into > them rather than producing a stream of `Either` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4320 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4320: [FLINK-6244] Emit timeouted Patterns as Side Output
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4320 I will merge it then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137878#comment-16137878 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on the issue: https://github.com/apache/flink/pull/4355 @fhueske @wuchong Thank you for your suggestion, I have update the PR and add test case, thanks. Do you have time to look at this? Thanks, Kaibo > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...
Github user kaibozhou commented on the issue: https://github.com/apache/flink/pull/4355 @fhueske @wuchong Thank you for your suggestion, I have update the PR and add test case, thanks. Do you have time to look at this? Thanks, Kaibo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7129) Dynamically changing patterns
[ https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137864#comment-16137864 ] Dian Fu commented on FLINK-7129: Hi [~dawidwys], I thought about the issue you mentioned for a few days and think that may not be a big problem. The patterns belongs to all the keys and can be stored in {{OperatorStateBackend}}. Then there is no need to distribute patterns to all keys and we just need to distribute them to all instances of operators. Please correct me if my understanding is not correct. Thanks a lot. > Dynamically changing patterns > - > > Key: FLINK-7129 > URL: https://issues.apache.org/jira/browse/FLINK-7129 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > > An umbrella task for introducing mechanism for injecting patterns through > coStream -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137860#comment-16137860 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134653998 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { +val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + } + + /** +* Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. +* +* @param indices indices of aggregate functions. +* @param ctxTerm field name of runtime context. +* @param accConfig data view config which contains id, field and StateDescriptos. +* @return statements to create [[MapView]] or [[ListView]]. +*/ + def addReusableDataViewConfig( + indices: Range, + ctxTerm: String, + accConfig: Option[DataViewConfig]) +: String = { +if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { + val initDataViews = new StringBuilder + val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get.accSpecs +.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) +.toMap[String, StateDescriptor[_, _]] + + for (i <- indices) yield { +for (spec <- accConfig.get.accSpecs(i)) yield { + val dataViewField = spec.field + val dataViewTypeTerm = dataViewField.getType.getCanonicalName + val desc = descMapping.getOrElse(spec.id, +throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + + val serializedData = AggregateUtil.serialize(desc) + val dataViewFieldTerm = s"acc${i}_${dataViewField.getName}_dataview" + val field = +s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin + reusableMemberStatements.add(field) + + val descFieldTerm = s"${dataViewFieldTerm}_desc" + val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName + val descDeserialize = +s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin + + val init = if (dataViewField.getType == classOf[MapView[_, _]]) { +s""" + |$descDeserialize + |$dataViewFieldTerm = + | org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm, --- End diff -- Agree, It can be codegen now. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134653998 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { +val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + } + + /** +* Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. +* +* @param indices indices of aggregate functions. +* @param ctxTerm field name of runtime context. +* @param accConfig data view config which contains id, field and StateDescriptos. +* @return statements to create [[MapView]] or [[ListView]]. +*/ + def addReusableDataViewConfig( + indices: Range, + ctxTerm: String, + accConfig: Option[DataViewConfig]) +: String = { +if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { + val initDataViews = new StringBuilder + val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get.accSpecs +.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) +.toMap[String, StateDescriptor[_, _]] + + for (i <- indices) yield { +for (spec <- accConfig.get.accSpecs(i)) yield { + val dataViewField = spec.field + val dataViewTypeTerm = dataViewField.getType.getCanonicalName + val desc = descMapping.getOrElse(spec.id, +throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + + val serializedData = AggregateUtil.serialize(desc) + val dataViewFieldTerm = s"acc${i}_${dataViewField.getName}_dataview" + val field = +s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin + reusableMemberStatements.add(field) + + val descFieldTerm = s"${dataViewFieldTerm}_desc" + val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName + val descDeserialize = +s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin + + val init = if (dataViewField.getType == classOf[MapView[_, _]]) { +s""" + |$descDeserialize + |$dataViewFieldTerm = + | org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm, --- End diff -- Agree, It can be codegen now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134653928 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { +val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + } + + /** +* Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. +* +* @param indices indices of aggregate functions. +* @param ctxTerm field name of runtime context. +* @param accConfig data view config which contains id, field and StateDescriptos. +* @return statements to create [[MapView]] or [[ListView]]. +*/ + def addReusableDataViewConfig( --- End diff -- Its a good idea, the code will be more clean after refactor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137857#comment-16137857 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134653928 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { +val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + } + + /** +* Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. +* +* @param indices indices of aggregate functions. +* @param ctxTerm field name of runtime context. +* @param accConfig data view config which contains id, field and StateDescriptos. +* @return statements to create [[MapView]] or [[ListView]]. +*/ + def addReusableDataViewConfig( --- End diff -- Its a good idea, the code will be more clean after refactor. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137854#comment-16137854 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134653871 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { --- End diff -- Yes, RuntimeContext not need to be member var. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134653871 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { --- End diff -- Yes, RuntimeContext not need to be member var. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fang Yong reassigned FLINK-6864: Assignee: Fang Yong > Remove confusing "invalid POJO type" messages from TypeExtractor > > > Key: FLINK-6864 > URL: https://issues.apache.org/jira/browse/FLINK-6864 > Project: Flink > Issue Type: Improvement > Components: Documentation, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will > log warnings such as ".. must have a default constructor to be used as a > POJO.", " ... is not a valid POJO type because not all fields are valid POJO > fields." in the {{analyzePojo}} method. > These messages are often conceived as misleading for the user to think that > the job should have failed, whereas in fact in these cases Flink just > fallsback to Kryo and treat then as generic types. We should remove these > messages, and at the same time improve the type serialization docs at [1] to > explicitly inform what it means when Flink does / does not recognizes a user > type as a POJO. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4574: [FLINK-6864] Fix confusing "invalid POJO type" messages f...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4574 @tzulitai I create this PR to fix [https://issues.apache.org/jira/browse/FLINK-6864](https://issues.apache.org/jira/browse/FLINK-6864), I think add logs instead of removing them will be better, what do you think? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137830#comment-16137830 ] ASF GitHub Bot commented on FLINK-6864: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4574 @tzulitai I create this PR to fix [https://issues.apache.org/jira/browse/FLINK-6864](https://issues.apache.org/jira/browse/FLINK-6864), I think add logs instead of removing them will be better, what do you think? Thanks > Remove confusing "invalid POJO type" messages from TypeExtractor > > > Key: FLINK-6864 > URL: https://issues.apache.org/jira/browse/FLINK-6864 > Project: Flink > Issue Type: Improvement > Components: Documentation, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai > > When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will > log warnings such as ".. must have a default constructor to be used as a > POJO.", " ... is not a valid POJO type because not all fields are valid POJO > fields." in the {{analyzePojo}} method. > These messages are often conceived as misleading for the user to think that > the job should have failed, whereas in fact in these cases Flink just > fallsback to Kryo and treat then as generic types. We should remove these > messages, and at the same time improve the type serialization docs at [1] to > explicitly inform what it means when Flink does / does not recognizes a user > type as a POJO. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137827#comment-16137827 ] ASF GitHub Bot commented on FLINK-6864: --- GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4574 [FLINK-6864] Fix confusing "invalid POJO type" messages from TypeExtractor ## What is the purpose of the change Fix confusing "invalid POJO type" messages from TypeExtractor ## Brief change log - *Improve log about pojo in TypeExtractor* - *Add notice in types_serialization.cmd about rules-for-pojo-types* ## Verifying this change This change needs no testing ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6864 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4574.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4574 commit 5b3861716404c1d1cce9f18af7ca596a21ecb014 Author: zjureel Date: 2017-08-23T02:30:31Z [FLINK-6864] Fix confusing "invalid POJO type" messages from TypeExtractor > Remove confusing "invalid POJO type" messages from TypeExtractor > > > Key: FLINK-6864 > URL: https://issues.apache.org/jira/browse/FLINK-6864 > Project: Flink > Issue Type: Improvement > Components: Documentation, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai > > When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will > log warnings such as ".. must have a default constructor to be used as a > POJO.", " ... is not a valid POJO type because not all fields are valid POJO > fields." in the {{analyzePojo}} method. > These messages are often conceived as misleading for the user to think that > the job should have failed, whereas in fact in these cases Flink just > fallsback to Kryo and treat then as generic types. We should remove these > messages, and at the same time improve the type serialization docs at [1] to > explicitly inform what it means when Flink does / does not recognizes a user > type as a POJO. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4574: [FLINK-6864] Fix confusing "invalid POJO type" mes...
GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4574 [FLINK-6864] Fix confusing "invalid POJO type" messages from TypeExtractor ## What is the purpose of the change Fix confusing "invalid POJO type" messages from TypeExtractor ## Brief change log - *Improve log about pojo in TypeExtractor* - *Add notice in types_serialization.cmd about rules-for-pojo-types* ## Verifying this change This change needs no testing ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6864 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4574.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4574 commit 5b3861716404c1d1cce9f18af7ca596a21ecb014 Author: zjureel Date: 2017-08-23T02:30:31Z [FLINK-6864] Fix confusing "invalid POJO type" messages from TypeExtractor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7268) Zookeeper Checkpoint Store interacting with Incremental State Handles can lead to loss of handles
[ https://issues.apache.org/jira/browse/FLINK-7268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137806#comment-16137806 ] ASF GitHub Bot commented on FLINK-7268: --- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4410#discussion_r134647310 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1044,10 +1049,23 @@ public boolean restoreLatestCheckpointedState( throw new IllegalStateException("CheckpointCoordinator is shut down"); } - // Recover the checkpoints - completedCheckpointStore.recover(sharedStateRegistry); + // We create a new shared state registry object, so that all pending async disposal requests from previous + // runs will go against the old object (were they can do no harm). + // This must happen under the checkpoint lock. + sharedStateRegistry.close(); + sharedStateRegistry = sharedStateRegistryFactory.create(executor); + + // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery --- End diff -- If we use highAvailabilityServices.getJobManagerLeaderRetriever(), Job Id is required. Can Job Id be obtained from JobVertexID ? > Zookeeper Checkpoint Store interacting with Incremental State Handles can > lead to loss of handles > - > > Key: FLINK-7268 > URL: https://issues.apache.org/jira/browse/FLINK-7268 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0, 1.3.1, 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > Attachments: gce_rocks_incr_external_gs-more-logs.txt > > > Release testing for Flink 1.3.2 has shown that this combination of features > leads to this errors when using a very low restart delay: > {code} > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: Item not found: > aljoscha/state-machine-checkpoints-2/f26e2b4c6891f2a9e0c5e4ba014733c3/chk-3/b246db8c-4f25-483a-b1fc-234f4319004d > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.getFileNotFoundException(GoogleCloudStorageExceptions.java:42) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:551) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:322) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.(GoogleHadoopFSInputStream.java:121) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1076) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1281) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1468) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1324) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRe
[GitHub] flink pull request #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegist...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4410#discussion_r134647310 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1044,10 +1049,23 @@ public boolean restoreLatestCheckpointedState( throw new IllegalStateException("CheckpointCoordinator is shut down"); } - // Recover the checkpoints - completedCheckpointStore.recover(sharedStateRegistry); + // We create a new shared state registry object, so that all pending async disposal requests from previous + // runs will go against the old object (were they can do no harm). + // This must happen under the checkpoint lock. + sharedStateRegistry.close(); + sharedStateRegistry = sharedStateRegistryFactory.create(executor); + + // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery --- End diff -- If we use highAvailabilityServices.getJobManagerLeaderRetriever(), Job Id is required. Can Job Id be obtained from JobVertexID ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137794#comment-16137794 ] Jark Wu commented on FLINK-7446: In the long run, we can provide a new interface called {{DefinedWatermark}}, which has two methods {{getRowtimeAttribute}} (can only be an existing field) and {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked deprecated. In addition, we can provide some built-in watermark generators, such as {{AscendingTimestamp}}, {{BoundedOutOfOrderness}}. What do you think ? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()
[ https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137761#comment-16137761 ] ASF GitHub Bot commented on FLINK-7402: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4562 @tedyu @NicoK Than you for your suggestions, it sounds good to me, thanks > Ineffective null check in NettyMessage#write() > -- > > Key: FLINK-7402 > URL: https://issues.apache.org/jira/browse/FLINK-7402 > Project: Flink > Issue Type: Bug > Components: Network >Reporter: Ted Yu >Priority: Minor > > Here is the null check in finally block: > {code} > finally { > if (buffer != null) { > buffer.recycle(); > } > {code} > But buffer has been dereferenced in the try block without guard. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4562: [FLINK-7402] Fix ineffective null check in NettyMessage#w...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4562 @tedyu @NicoK Than you for your suggestions, it sounds good to me, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-7491: --- Component/s: Table API & SQL > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages
[ https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ChungChe Lai reassigned FLINK-6004: --- Assignee: ChungChe Lai > Allow FlinkKinesisConsumer to skip corrupted messages > - > > Key: FLINK-6004 > URL: https://issues.apache.org/jira/browse/FLINK-6004 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: ChungChe Lai > > It is quite clear from the fix of FLINK-3679 that in reality, users might > encounter corrupted messages from Kafka / Kinesis / generally external > sources when deserializing them. > The consumers should support simply skipping those messages, by letting the > deserialization schema return {{null}}, and checking {{null}} values within > the consumer. > This has been done for the Kafka consumer already. This ticket tracks the > improvement for the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
issues@flink.apache.org
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137601#comment-16137601 ] Jacob Park commented on FLINK-7465: --- [~sunjincheng121] [~fhueske] May I recommend referring to Apache Spark's JIRA ticket for a similar problem?: https://issues.apache.org/jira/browse/SPARK-12818. See https://github.com/apache/spark/tree/branch-2.2/common/sketch/src/main/java/org/apache/spark/util/sketch for examples. Twitter also has great examples of probabilistic and algebraic data structures: https://github.com/twitter/algebird. [~sunjincheng121] If I understand the use-case correctly, you want to implement built-in approximation functions for counting distinct elements? I recommend not using a Bloom Filter. A Bloom Filter is a great candidate for querying set-membership and classification of elements as unique or duplicate. However, a Bloom Filter is a poor candidate for approximating the cardinality of elements of a set relative to the size of its multiset. To me, it sounds non-sensical to provide a maxKeyCount parameter. Use a CMS or a HyperLogLog. Please see slide 48 in http://lintool.github.io/UMD-courses/bigdata-2015-Spring/slides/session11.pdf. These slides are from Jimmy Lin (https://cs.uwaterloo.ca/~jimmylin/index.html); he provides a great overview of what probabilistic data structures to use for the desired query. > Add build-in BloomFilterCount on TableAPI&SQL > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-4004) Do not pass custom flink kafka connector properties to Kafka to avoid warnings
[ https://issues.apache.org/jira/browse/FLINK-4004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luffy Tsai reassigned FLINK-4004: - Assignee: Luffy Tsai > Do not pass custom flink kafka connector properties to Kafka to avoid warnings > -- > > Key: FLINK-4004 > URL: https://issues.apache.org/jira/browse/FLINK-4004 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Robert Metzger >Assignee: Luffy Tsai > > The FlinkKafkaConsumer has some custom properties, which we pass to the > KafkaConsumer as well (such as {{flink.poll-timeout}}). This causes Kafka to > log warnings about unused properties. > We should not pass Flink-internal properties to Kafka, to avoid those > warnings. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
issues@flink.apache.org
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137425#comment-16137425 ] Fabian Hueske commented on FLINK-7465: -- I'm sorry, I confused count-min sketches (for approximate group counts) and HyperLogLog (for approximate distinct counts). I assume the goal of the BloomFilterCount function is to (approximately) count the number of distinct values. In contrast to HyperLogLog, Bloom filters are not specifically designed for approximate distinct counting but for approximate membership testing. AFAIK, bloom filters should be more precise for log distinct cardinalities but HyperLogLog should provide much better results for larger cardinalities. IMO, [~jark]'s idea to split the bitmask into multiple long values is pretty nice. OTOH, point multiple RocksDB lookups might also be more expensive than a single lookup with larger serialization payload (the deserialization logic for byte arrays shouldn't be very costy). > Add build-in BloomFilterCount on TableAPI&SQL > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6465) support FIRST_VALUE on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137422#comment-16137422 ] ASF GitHub Bot commented on FLINK-6465: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4556 Hi @fhueske @twalthr @wuchong @shaoxuan-wang , I am appreciated if you can review the PR. Thanks, jincheng > support FIRST_VALUE on Table API & SQL > -- > > Key: FLINK-6465 > URL: https://issues.apache.org/jira/browse/FLINK-6465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: sunjincheng > > {{FIRST_VALUE}} is a OVER WINDOW function. In this JIRA. will add > {{FIRST_VALUE}} function support on TableAPI & SQL. > *Syntax:* > FIRST_VALUE ( [scalar_expression ] ) > OVER ( [ partition_by_clause ] order_by_clause [ rows_range_clause ] ) > [About OVER > WINDOWS|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/tableApi.html#over-windows] > scalar_expression > Is the value to be returned. scalar_expression can be a column, or other > arbitrary expression that results in a single value. Other analytic functions > are not permitted. > *NOTE:* > * {{FIRST_VALUE}} if used for OVER WINDOW, e.g.: > {code} > SELECT A, B, FIRST_VALUE(C) OVER (ORDER BY E) AS firstValue FROM tab > {code} > * OVER WINDOW's retraction is expensive(currently not supported yet), and > this JIRA. does not implement Retract logic of {{FIRST_VALUE}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4556: [FLINK-6465][table]support FIRST_VALUE on Table API & SQL
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4556 Hi @fhueske @twalthr @wuchong @shaoxuan-wang , I am appreciated if you can review the PR. Thanks, jincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137358#comment-16137358 ] Fabian Hueske commented on FLINK-7446: -- I agree that the watermark generation for TableSource's should be reworked. However, I'm not sure if we will achieve that before the next release. If we want to support to define an event-time indicator as an existing field and want to be sure that the implementation of the {{TableSource}} is correct, we would have to compare the {{StreamRecord}} timestamp with the existing attribute at runtime for each record. However, since {{TableSource}} is an interface for rather experienced users (rather DBA than DB user), I'd be fine to omit this safety check. In the long run I agree with [~xccui]. We should have a watermark generator interface that works on an existing field with built-in implementations for the common cases. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7471) Improve bounded OVER support non-retract method AGG
[ https://issues.apache.org/jira/browse/FLINK-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137343#comment-16137343 ] Fabian Hueske commented on FLINK-7471: -- I understand the motivation for this issue and agree that it would be good to support non-retractable UDAGGs for bounded OVER aggregations. My question is rather what happens if we have a mix of retractable and non-retractable aggregation functions? Are the retractable functions efficiently computed or not? Treating all aggregation functions as non-retractable might introduce a significant and unnecessary performance penalty. Think a of a query with several retractable aggregations and then adding one non-retractable UDAGG. Handling retractable and non-retractable aggregation functions separately would mean to add quite a bit of additional logic. For example, we would need to extend the {{GeneratedAggregations}} interface to account for "empty" fields in the output record which would be filled by the other aggregation logic. > Improve bounded OVER support non-retract method AGG > --- > > Key: FLINK-7471 > URL: https://issues.apache.org/jira/browse/FLINK-7471 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently BOUNDED OVER WINDOW only support have {{retract}} method AGG. In > this JIRA. will add non-retract method support. > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137250#comment-16137250 ] Jacob Park commented on FLINK-7398: --- +1 for the checkstyle rule. > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: > val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/sca
[jira] [Commented] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.
[ https://issues.apache.org/jira/browse/FLINK-4947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137249#comment-16137249 ] Jamie Grier commented on FLINK-4947: [~gaborhermann] It's more than just that but yes I do suggest that you should be able to override what's in the config file on the command line. More importantly though is that all *config* should be configurable via flink-conf.yaml. We shouldn't add features that are only configurable from the *user code*. An example of this used to be the RocksDB state backend. If you wanted to use that backend and configure it in "async" mode you had to put this in application code, but that's not great for separation of concerns between application developers and ops/platform teams. I know this isn't black-and-white but we should try to clearly separate configuration from user code by putting everything in flink-conf.yaml. We should *also* make it possible to override any of those values on the command line when submitting a job. > Make all configuration possible via flink-conf.yaml and CLI. > > > Key: FLINK-4947 > URL: https://issues.apache.org/jira/browse/FLINK-4947 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jamie Grier > > I think it's important to make all configuration possible via the > flink-conf.yaml and the command line. > As an example: To enable "externalizedCheckpoints" you must actually call > the StreamExecutionEnvironment#enableExternalizedCheckpoints() method from > your Flink program. > Another example of this would be configuring the RocksDB state backend. > I think it important to make deployment flexible and easy to build tools > around. For example, the infrastructure teams that make these configuration > decisions and provide tools for deploying Flink apps, will be different from > the teams deploying apps. The team writing apps should not have to set all > of this lower level configuration up in their programs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7123) Support timesOrMore in CEP
[ https://issues.apache.org/jira/browse/FLINK-7123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-7123. - Resolution: Fixed Merged with 9995588c83536e04aef3425757a008ba4c78dbde > Support timesOrMore in CEP > -- > > Key: FLINK-7123 > URL: https://issues.apache.org/jira/browse/FLINK-7123 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > The CEP API should provide API such as timesOrMore(#ofTimes) for quantifier > {n,}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7123) Support timesOrMore in CEP
[ https://issues.apache.org/jira/browse/FLINK-7123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137120#comment-16137120 ] ASF GitHub Bot commented on FLINK-7123: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4523 > Support timesOrMore in CEP > -- > > Key: FLINK-7123 > URL: https://issues.apache.org/jira/browse/FLINK-7123 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > The CEP API should provide API such as timesOrMore(#ofTimes) for quantifier > {n,}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4523: [FLINK-7123] [cep] Support timesOrMore in CEP
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4523 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7493) Create Pinot Connector
Zhenqiu Huang created FLINK-7493: Summary: Create Pinot Connector Key: FLINK-7493 URL: https://issues.apache.org/jira/browse/FLINK-7493 Project: Flink Issue Type: New Feature Reporter: Zhenqiu Huang Assignee: Zhenqiu Huang Add pinot connector for streaming ingestion and batch segment file push. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7492) Memsql Connector
Zhenqiu Huang created FLINK-7492: Summary: Memsql Connector Key: FLINK-7492 URL: https://issues.apache.org/jira/browse/FLINK-7492 Project: Flink Issue Type: New Feature Reporter: Zhenqiu Huang Assignee: Zhenqiu Huang Add an output connector for both streaming and batch ingestion for Memsql. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
Shuyi Chen created FLINK-7491: - Summary: Support COLLECT Aggregate function in Flink SQL Key: FLINK-7491 URL: https://issues.apache.org/jira/browse/FLINK-7491 Project: Flink Issue Type: New Feature Reporter: Shuyi Chen Assignee: Shuyi Chen -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)
[ https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137075#comment-16137075 ] Till Rohrmann commented on FLINK-4319: -- Hi Bowen, development of Flip-6 is already under way for quite some time and we have already made some really good progress. We hope to get the rework done with the Flink 1.4 release. What we are currently working on are the REST endpoints, reach functional parity with the existing code base and thorough tests of the new components. Every helping hand is highly welcome :-) > Rework Cluster Management (FLIP-6) > -- > > Key: FLINK-4319 > URL: https://issues.apache.org/jira/browse/FLINK-4319 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Till Rohrmann > Labels: flip-6 > > This is the root issue to track progress of the rework of cluster management > (FLIP-6) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-4319) Rework Cluster Management (FLIP-6)
[ https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-4319: Assignee: Till Rohrmann > Rework Cluster Management (FLIP-6) > -- > > Key: FLINK-4319 > URL: https://issues.apache.org/jira/browse/FLINK-4319 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Till Rohrmann > Labels: flip-6 > > This is the root issue to track progress of the rework of cluster management > (FLIP-6) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)
[ https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137064#comment-16137064 ] Bowen Li commented on FLINK-4319: - Hi guys, what's the schedule for this task? When are we starting, and what's the target release? > Rework Cluster Management (FLIP-6) > -- > > Key: FLINK-4319 > URL: https://issues.apache.org/jira/browse/FLINK-4319 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen > Labels: flip-6 > > This is the root issue to track progress of the rework of cluster management > (FLIP-6) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7490) UDF Agg throws Exception when flink-table is loaded with AppClassLoader
Miguel Rui Pereira Marques created FLINK-7490: - Summary: UDF Agg throws Exception when flink-table is loaded with AppClassLoader Key: FLINK-7490 URL: https://issues.apache.org/jira/browse/FLINK-7490 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.1 Reporter: Miguel Rui Pereira Marques When a UDF aggregation for the Batch Table API is defined in the FlinkUserCodeClassLoader and the Table API itself is loaded in the AppClassLoader (the jar is included in the lib directory) this exception is triggered: {panel:title=Exception} java.lang.Exception: The user defined 'open()' method caused an exception: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.aggregate.DataSetAggFunction.compile(DataSetAggFunction.scala:35) at org.apache.flink.table.runtime.aggregate.DataSetAggFunction.open(DataSetAggFunction.scala:49) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) ... 3 more Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 13: Cannot determine simple type name "org" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) ... {panel} Upon inspecting the code I think this may be due to the usage of 'getClass.getClassLoader' instead of 'getRuntimeContext.getUserCodeClassLoader' as an argument 'compile' in the method 'open' of class org.apache.flink.table.runtime.aggregate.DataSetAggFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7123) Support timesOrMore in CEP
[ https://issues.apache.org/jira/browse/FLINK-7123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137030#comment-16137030 ] ASF GitHub Bot commented on FLINK-7123: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4523 LGTM, merging this > Support timesOrMore in CEP > -- > > Key: FLINK-7123 > URL: https://issues.apache.org/jira/browse/FLINK-7123 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > The CEP API should provide API such as timesOrMore(#ofTimes) for quantifier > {n,}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4523: [FLINK-7123] [cep] Support timesOrMore in CEP
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4523 LGTM, merging this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137006#comment-16137006 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134529419 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collections; +import java.util.Set; + +/** + * This class links {@link RequestBody}s to {@link ResponseBody}s types and contains meta-data required for their http headers. + * + * Implementations must be state-less. + * + * @param request type + * @param response type + */ +public interface MessageHeaders { --- End diff -- Alright, maybe this could be documented as part of the `@param ` description. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134529419 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collections; +import java.util.Set; + +/** + * This class links {@link RequestBody}s to {@link ResponseBody}s types and contains meta-data required for their http headers. + * + * Implementations must be state-less. + * + * @param request type + * @param response type + */ +public interface MessageHeaders { --- End diff -- Alright, maybe this could be documented as part of the `@param ` description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137002#comment-16137002 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134528391 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * This class is used to map query/path {@link Parameter}s to their actual value. + */ +public abstract class ParameterMapper { + + /** +* Maps the given query {@link Parameter}s to their actual value. +* +* @param queryParameters parameters to map +* @return map containing the parameters and their associated value +*/ + public Map mapQueryParameters(Set queryParameters) { + return Collections.emptyMap(); + } + + /** +* Maps the given path {@link Parameter}s to their actual value. +* +* @param pathParameters parameters to map +* @return map containing the parameters and their associated value +*/ + public Map mapPathParameters(Set pathParameters) { + return Collections.emptyMap(); + } + + /** +* Resolves the given URL (e.g "jobs/:jobid") using the given path/query parameters. +* +* @param genericUrl URL to resolve +* @param pathParameters path parameters +* @param queryParameters query parameters +* @return resolved url, e.g "/jobs/1234?state=running" +*/ + public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) { + StringBuilder sb = new StringBuilder(genericUrl); + + pathParameters.forEach((parameter, value) -> { + int start = sb.indexOf(":" + parameter.getKey()); + sb.replace(start, start + parameter.getKey().length() + 1, value); + }); --- End diff -- But we could, for example, count how many placeholders there are in the request URL and check that all of them have been replaced, couldn't we? Or is it also a valid request URL if a placeholder has not been replaced with a concrete value? > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134528391 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * This class is used to map query/path {@link Parameter}s to their actual value. + */ +public abstract class ParameterMapper { + + /** +* Maps the given query {@link Parameter}s to their actual value. +* +* @param queryParameters parameters to map +* @return map containing the parameters and their associated value +*/ + public Map mapQueryParameters(Set queryParameters) { + return Collections.emptyMap(); + } + + /** +* Maps the given path {@link Parameter}s to their actual value. +* +* @param pathParameters parameters to map +* @return map containing the parameters and their associated value +*/ + public Map mapPathParameters(Set pathParameters) { + return Collections.emptyMap(); + } + + /** +* Resolves the given URL (e.g "jobs/:jobid") using the given path/query parameters. +* +* @param genericUrl URL to resolve +* @param pathParameters path parameters +* @param queryParameters query parameters +* @return resolved url, e.g "/jobs/1234?state=running" +*/ + public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) { + StringBuilder sb = new StringBuilder(genericUrl); + + pathParameters.forEach((parameter, value) -> { + int start = sb.indexOf(":" + parameter.getKey()); + sb.replace(start, start + parameter.getKey().length() + 1, value); + }); --- End diff -- But we could, for example, count how many placeholders there are in the request URL and check that all of them have been replaced, couldn't we? Or is it also a valid request URL if a placeholder has not been replaced with a concrete value? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136978#comment-16136978 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134523787 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * This class is used to map query/path {@link Parameter}s to their actual value. + */ +public abstract class ParameterMapper { + + /** +* Maps the given query {@link Parameter}s to their actual value. +* +* @param queryParameters parameters to map +* @return map containing the parameters and their associated value +*/ + public Map mapQueryParameters(Set queryParameters) { + return Collections.emptyMap(); + } + + /** +* Maps the given path {@link Parameter}s to their actual value. +* +* @param pathParameters parameters to map +* @return map containing the parameters and their associated value +*/ + public Map mapPathParameters(Set pathParameters) { + return Collections.emptyMap(); + } + + /** +* Resolves the given URL (e.g "jobs/:jobid") using the given path/query parameters. +* +* @param genericUrl URL to resolve +* @param pathParameters path parameters +* @param queryParameters query parameters +* @return resolved url, e.g "/jobs/1234?state=running" +*/ + public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) { + StringBuilder sb = new StringBuilder(genericUrl); + + pathParameters.forEach((parameter, value) -> { + int start = sb.indexOf(":" + parameter.getKey()); + sb.replace(start, start + parameter.getKey().length() + 1, value); + }); --- End diff -- we can't check that in this static method since we neither know the original parameters nor can search for leftover parameters (since `:` is a valid character for a url). The only place where we could do it is in the client, but it wouldn't be able to tell whether the replace parts or correct anyway, so there might not be much value in checking. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134523787 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * This class is used to map query/path {@link Parameter}s to their actual value. + */ +public abstract class ParameterMapper { + + /** +* Maps the given query {@link Parameter}s to their actual value. +* +* @param queryParameters parameters to map +* @return map containing the parameters and their associated value +*/ + public Map mapQueryParameters(Set queryParameters) { + return Collections.emptyMap(); + } + + /** +* Maps the given path {@link Parameter}s to their actual value. +* +* @param pathParameters parameters to map +* @return map containing the parameters and their associated value +*/ + public Map mapPathParameters(Set pathParameters) { + return Collections.emptyMap(); + } + + /** +* Resolves the given URL (e.g "jobs/:jobid") using the given path/query parameters. +* +* @param genericUrl URL to resolve +* @param pathParameters path parameters +* @param queryParameters query parameters +* @return resolved url, e.g "/jobs/1234?state=running" +*/ + public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) { + StringBuilder sb = new StringBuilder(genericUrl); + + pathParameters.forEach((parameter, value) -> { + int start = sb.indexOf(":" + parameter.getKey()); + sb.replace(start, start + parameter.getKey().length() + 1, value); + }); --- End diff -- we can't check that in this static method since we neither know the original parameters nor can search for leftover parameters (since `:` is a valid character for a url). The only place where we could do it is in the client, but it wouldn't be able to tell whether the replace parts or correct anyway, so there might not be much value in checking. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6378) Implement FLIP-6 Flink-on-Mesos
[ https://issues.apache.org/jira/browse/FLINK-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright reassigned FLINK-6378: --- Assignee: Eron Wright > Implement FLIP-6 Flink-on-Mesos > --- > > Key: FLINK-6378 > URL: https://issues.apache.org/jira/browse/FLINK-6378 > Project: Flink > Issue Type: New Feature > Components: Mesos >Reporter: Eron Wright >Assignee: Eron Wright > Labels: flip-6 > > This is the parent issue for implementing Flink on Mesos using the new FLIP-6 > architecture. > This covers individual jobs running as Mesos frameworks, where the framework > and job lifetime are coupled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136944#comment-16136944 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134517671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +/** + * Simple container for the response of a handler, that contains either a response of type {@code P} if the incoming + * request was handled successfully, otherwise it contains an error message and an associated error code. + * + * @param type of successful response + */ +public final class HandlerResponse { --- End diff -- Yes it would require casting the response. I think this is more explicit and prevents the user from shooting himself in the foot. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134517671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +/** + * Simple container for the response of a handler, that contains either a response of type {@code P} if the incoming + * request was handled successfully, otherwise it contains an error message and an associated error code. + * + * @param type of successful response + */ +public final class HandlerResponse { --- End diff -- Yes it would require casting the response. I think this is more explicit and prevents the user from shooting himself in the foot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136943#comment-16136943 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134517389 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ParameterMapper; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * This client is the counter-part to the {@link RestServerEndpoint}. + */ +public class RestClientEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class); + + private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + private final String configuredTargetAddress; + private final int configuredTargetPort; + private final SSLEngine sslEngine; + + private Bootstrap bootstrap; + + private final Client
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134517389 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ParameterMapper; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * This client is the counter-part to the {@link RestServerEndpoint}. + */ +public class RestClientEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class); + + private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + private final String configuredTargetAddress; + private final int configuredTargetPort; + private final SSLEngine sslEngine; + + private Bootstrap bootstrap; + + private final ClientHandler handler = new ClientHandler(); + + private CompletableFuture lastFuture = CompletableFuture.completedFuture(null); + + private final Executor directExecutor = Executors.directExecutor(); + + public RestClientEndpoint(
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136925#comment-16136925 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134512942 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collections; +import java.util.Set; + +/** + * This class links {@link RequestBody}s to {@link ResponseBody}s types and contains meta-data required for their http headers. + * + * Implementations must be state-less. + * + * @param request type + * @param response type + */ +public interface MessageHeaders { --- End diff -- The idea is to have the headers define which parameter mapper is used, to prevent some unrelated mapper to be passed in. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134512942 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collections; +import java.util.Set; + +/** + * This class links {@link RequestBody}s to {@link ResponseBody}s types and contains meta-data required for their http headers. + * + * Implementations must be state-less. + * + * @param request type + * @param response type + */ +public interface MessageHeaders { --- End diff -- The idea is to have the headers define which parameter mapper is used, to prevent some unrelated mapper to be passed in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136922#comment-16136922 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134512163 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ParameterMapper; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * This client is the counter-part to the {@link RestServerEndpoint}. + */ +public class RestClientEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class); + + private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + private final String configuredTargetAddress; + private final int configuredTargetPort; + private final SSLEngine sslEngine; + + private Bootstrap bootstrap; + + private final ClientHandle
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134512163 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ParameterMapper; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * This client is the counter-part to the {@link RestServerEndpoint}. + */ +public class RestClientEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class); + + private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + private final String configuredTargetAddress; + private final int configuredTargetPort; + private final SSLEngine sslEngine; + + private Bootstrap bootstrap; + + private final ClientHandler handler = new ClientHandler(); + + private CompletableFuture lastFuture = CompletableFuture.completedFuture(null); + + private final Executor directExecutor = Executors.directExecutor(); + + public RestClientEndpoint(RestCl
[jira] [Updated] (FLINK-4048) Remove Hadoop dependencies from ExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-4048: Issue Type: Bug (was: Sub-task) Parent: (was: FLINK-3957) > Remove Hadoop dependencies from ExecutionEnvironment > > > Key: FLINK-4048 > URL: https://issues.apache.org/jira/browse/FLINK-4048 > Project: Flink > Issue Type: Bug > Components: DataSet API >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-4048) Remove Hadoop dependencies from ExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-4048: --- Assignee: Aljoscha Krettek > Remove Hadoop dependencies from ExecutionEnvironment > > > Key: FLINK-4048 > URL: https://issues.apache.org/jira/browse/FLINK-4048 > Project: Flink > Issue Type: Sub-task > Components: DataSet API >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4048) Remove Hadoop dependencies from ExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-4048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136915#comment-16136915 ] Aljoscha Krettek commented on FLINK-4048: - I was experimenting with this as part of FLINK-2268 and noticed that the Hadoop-related methods on {{ExecutionEnvironment}} are {{@PublicEvolving}} so I wen't ahead and removed them. Seems to work smoothly so far. > Remove Hadoop dependencies from ExecutionEnvironment > > > Key: FLINK-4048 > URL: https://issues.apache.org/jira/browse/FLINK-4048 > Project: Flink > Issue Type: Sub-task > Components: DataSet API >Reporter: Robert Metzger > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136912#comment-16136912 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134510178 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +/** + * Simple container for the response of a handler, that contains either a response of type {@code P} if the incoming + * request was handled successfully, otherwise it contains an error message and an associated error code. + * + * @param type of successful response + */ +public final class HandlerResponse { --- End diff -- But even if we split this interface a user that gets a `HandlerResponse` still has to do the same checks as to whether it was successful or not. Unless you suggest to have an empty `HandlerResponse` interface and force users to do an instanceof+cast every time. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-2268) Provide Flink binary release without Hadoop
[ https://issues.apache.org/jira/browse/FLINK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-2268: --- Assignee: Aljoscha Krettek > Provide Flink binary release without Hadoop > --- > > Key: FLINK-2268 > URL: https://issues.apache.org/jira/browse/FLINK-2268 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > > Currently, all Flink releases ship with Hadoop 2.3.0 binaries. > The big Hadoop distributions are usually not relying on vanilla Hadoop > releases, but on custom patched versions. > To provide the best user experience, we should offer a Flink binary that uses > the Hadoop jars provided by the user (=hadoop distribution) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134510178 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +/** + * Simple container for the response of a handler, that contains either a response of type {@code P} if the incoming + * request was handled successfully, otherwise it contains an error message and an associated error code. + * + * @param type of successful response + */ +public final class HandlerResponse { --- End diff -- But even if we split this interface a user that gets a `HandlerResponse` still has to do the same checks as to whether it was successful or not. Unless you suggest to have an empty `HandlerResponse` interface and force users to do an instanceof+cast every time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136899#comment-16136899 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134507274 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -155,38 +129,66 @@ public void shutdown() { .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); - synchronized (this) { - // This ensures strict sequential processing of requests. - // If we send new requests immediately we can no longer make assumptions about the order in which responses - // arrive, due to which the handler cannot know which future he should complete (not to mention what response - // type to read). - CompletableFuture nextFuture = lastFuture - .handleAsync((f, e) -> submitRequest(httpRequest, messageHeaders), directExecutor) - .thenCompose((future) -> future); - - lastFuture = nextFuture; - return nextFuture; - } + return submitRequest(httpRequest, messageHeaders); } private , U extends ParameterMapper, R extends RequestBody, P extends ResponseBody> CompletableFuture submitRequest(FullHttpRequest httpRequest, M messageHeaders) { - CompletableFuture responseFuture = handler.expectResponse(messageHeaders.getResponseClass()); - - try { - // write request - Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); - channel.writeAndFlush(httpRequest); - channel.closeFuture(); - } catch (InterruptedException e) { - return FutureUtils.completedExceptionally(e); + synchronized (lock) { + CompletableFuture responseFuture = ClientHandler.addHandlerForResponse(bootstrap, sslEngine, messageHeaders.getResponseClass()); + + try { + // write request + Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); + channel.writeAndFlush(httpRequest); + channel.closeFuture(); + } catch (InterruptedException e) { + return FutureUtils.completedExceptionally(e); + } + return responseFuture; } - return responseFuture; } - @ChannelHandler.Sharable - private static class ClientHandler extends SimpleChannelInboundHandler { + private static class RestChannelInitializer extends ChannelInitializer { - private volatile ExpectedResponse expectedResponse = null; + private final SSLEngine sslEngine; + private final ClientHandler handler; + + public RestChannelInitializer(SSLEngine sslEngine, ClientHandler handler) { + this.sslEngine = sslEngine; + this.handler = handler; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + // SSL should be the first handler in the pipeline + if (sslEngine != null) { + ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); + } + + ch.pipeline() + .addLast(new HttpClientCodec()) + .addLast(new HttpObjectAggregator(1024 * 1024)) + .addLast(handler) + .addLast(new PipelineErrorHandler(LOG)); + } + } + + private static class ClientHandler extends SimpleChannelInboundHandler { + + private final ExpectedResponse expectedResponse; + + private ClientHandler(ExpectedResponse expectedResponse) { + this.expectedResponse = expectedResponse; + } + + static CompletableFuture addHandlerForResponse(Bootstrap bootStrap, SSLEngine sslEngine, Class expectedResponse) { + CompletableFuture responseFuture = new CompletableFuture<>();
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134507274 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -155,38 +129,66 @@ public void shutdown() { .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); - synchronized (this) { - // This ensures strict sequential processing of requests. - // If we send new requests immediately we can no longer make assumptions about the order in which responses - // arrive, due to which the handler cannot know which future he should complete (not to mention what response - // type to read). - CompletableFuture nextFuture = lastFuture - .handleAsync((f, e) -> submitRequest(httpRequest, messageHeaders), directExecutor) - .thenCompose((future) -> future); - - lastFuture = nextFuture; - return nextFuture; - } + return submitRequest(httpRequest, messageHeaders); } private , U extends ParameterMapper, R extends RequestBody, P extends ResponseBody> CompletableFuture submitRequest(FullHttpRequest httpRequest, M messageHeaders) { - CompletableFuture responseFuture = handler.expectResponse(messageHeaders.getResponseClass()); - - try { - // write request - Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); - channel.writeAndFlush(httpRequest); - channel.closeFuture(); - } catch (InterruptedException e) { - return FutureUtils.completedExceptionally(e); + synchronized (lock) { + CompletableFuture responseFuture = ClientHandler.addHandlerForResponse(bootstrap, sslEngine, messageHeaders.getResponseClass()); + + try { + // write request + Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); + channel.writeAndFlush(httpRequest); + channel.closeFuture(); + } catch (InterruptedException e) { + return FutureUtils.completedExceptionally(e); + } + return responseFuture; } - return responseFuture; } - @ChannelHandler.Sharable - private static class ClientHandler extends SimpleChannelInboundHandler { + private static class RestChannelInitializer extends ChannelInitializer { - private volatile ExpectedResponse expectedResponse = null; + private final SSLEngine sslEngine; + private final ClientHandler handler; + + public RestChannelInitializer(SSLEngine sslEngine, ClientHandler handler) { + this.sslEngine = sslEngine; + this.handler = handler; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + // SSL should be the first handler in the pipeline + if (sslEngine != null) { + ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); + } + + ch.pipeline() + .addLast(new HttpClientCodec()) + .addLast(new HttpObjectAggregator(1024 * 1024)) + .addLast(handler) + .addLast(new PipelineErrorHandler(LOG)); + } + } + + private static class ClientHandler extends SimpleChannelInboundHandler { + + private final ExpectedResponse expectedResponse; + + private ClientHandler(ExpectedResponse expectedResponse) { + this.expectedResponse = expectedResponse; + } + + static CompletableFuture addHandlerForResponse(Bootstrap bootStrap, SSLEngine sslEngine, Class expectedResponse) { + CompletableFuture responseFuture = new CompletableFuture<>(); + + ClientHandler handler = new ClientHandler<>(new ExpectedResponse<>(expectedResponse, responseFuture)); + bootStrap.handler(new RestChannelInitializer(sslEngine, handler)); + + re
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136898#comment-16136898 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134507146 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +/** + * Simple container for the response of a handler, that contains either a response of type {@code P} if the incoming + * request was handled successfully, otherwise it contains an error message and an associated error code. + * + * @param type of successful response + */ +public final class HandlerResponse { --- End diff -- See the checks when accessing the different fields. This can easily throw an exception if the user did not properly check what's the type of the response. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134507146 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +/** + * Simple container for the response of a handler, that contains either a response of type {@code P} if the incoming + * request was handled successfully, otherwise it contains an error message and an associated error code. + * + * @param type of successful response + */ +public final class HandlerResponse { --- End diff -- See the checks when accessing the different fields. This can easily throw an exception if the user did not properly check what's the type of the response. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136896#comment-16136896 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134506551 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ParameterMapper; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * This client is the counter-part to the {@link RestServerEndpoint}. + */ +public class RestClientEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class); + + private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + private final String configuredTargetAddress; + private final int configuredTargetPort; + private final SSLEngine sslEngine; + + private Bootstrap bootstrap; + + private final Client
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134506551 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ParameterMapper; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * This client is the counter-part to the {@link RestServerEndpoint}. + */ +public class RestClientEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class); + + private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + private final String configuredTargetAddress; + private final int configuredTargetPort; + private final SSLEngine sslEngine; + + private Bootstrap bootstrap; + + private final ClientHandler handler = new ClientHandler(); + + private CompletableFuture lastFuture = CompletableFuture.completedFuture(null); + + private final Executor directExecutor = Executors.directExecutor(); + + public RestClientEndpoint(
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136890#comment-16136890 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134505729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -155,38 +129,66 @@ public void shutdown() { .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); - synchronized (this) { - // This ensures strict sequential processing of requests. - // If we send new requests immediately we can no longer make assumptions about the order in which responses - // arrive, due to which the handler cannot know which future he should complete (not to mention what response - // type to read). - CompletableFuture nextFuture = lastFuture - .handleAsync((f, e) -> submitRequest(httpRequest, messageHeaders), directExecutor) - .thenCompose((future) -> future); - - lastFuture = nextFuture; - return nextFuture; - } + return submitRequest(httpRequest, messageHeaders); } private , U extends ParameterMapper, R extends RequestBody, P extends ResponseBody> CompletableFuture submitRequest(FullHttpRequest httpRequest, M messageHeaders) { - CompletableFuture responseFuture = handler.expectResponse(messageHeaders.getResponseClass()); - - try { - // write request - Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); - channel.writeAndFlush(httpRequest); - channel.closeFuture(); - } catch (InterruptedException e) { - return FutureUtils.completedExceptionally(e); + synchronized (lock) { + CompletableFuture responseFuture = ClientHandler.addHandlerForResponse(bootstrap, sslEngine, messageHeaders.getResponseClass()); + + try { + // write request + Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); + channel.writeAndFlush(httpRequest); + channel.closeFuture(); + } catch (InterruptedException e) { + return FutureUtils.completedExceptionally(e); + } + return responseFuture; } - return responseFuture; } - @ChannelHandler.Sharable - private static class ClientHandler extends SimpleChannelInboundHandler { + private static class RestChannelInitializer extends ChannelInitializer { - private volatile ExpectedResponse expectedResponse = null; + private final SSLEngine sslEngine; + private final ClientHandler handler; + + public RestChannelInitializer(SSLEngine sslEngine, ClientHandler handler) { + this.sslEngine = sslEngine; + this.handler = handler; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + // SSL should be the first handler in the pipeline + if (sslEngine != null) { + ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); + } + + ch.pipeline() + .addLast(new HttpClientCodec()) + .addLast(new HttpObjectAggregator(1024 * 1024)) + .addLast(handler) + .addLast(new PipelineErrorHandler(LOG)); + } + } + + private static class ClientHandler extends SimpleChannelInboundHandler { + + private final ExpectedResponse expectedResponse; + + private ClientHandler(ExpectedResponse expectedResponse) { + this.expectedResponse = expectedResponse; + } + + static CompletableFuture addHandlerForResponse(Bootstrap bootStrap, SSLEngine sslEngine, Class expectedResponse) { + CompletableFuture responseFuture = new CompletableFuture<>(); +
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134505729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -155,38 +129,66 @@ public void shutdown() { .set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); - synchronized (this) { - // This ensures strict sequential processing of requests. - // If we send new requests immediately we can no longer make assumptions about the order in which responses - // arrive, due to which the handler cannot know which future he should complete (not to mention what response - // type to read). - CompletableFuture nextFuture = lastFuture - .handleAsync((f, e) -> submitRequest(httpRequest, messageHeaders), directExecutor) - .thenCompose((future) -> future); - - lastFuture = nextFuture; - return nextFuture; - } + return submitRequest(httpRequest, messageHeaders); } private , U extends ParameterMapper, R extends RequestBody, P extends ResponseBody> CompletableFuture submitRequest(FullHttpRequest httpRequest, M messageHeaders) { - CompletableFuture responseFuture = handler.expectResponse(messageHeaders.getResponseClass()); - - try { - // write request - Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); - channel.writeAndFlush(httpRequest); - channel.closeFuture(); - } catch (InterruptedException e) { - return FutureUtils.completedExceptionally(e); + synchronized (lock) { + CompletableFuture responseFuture = ClientHandler.addHandlerForResponse(bootstrap, sslEngine, messageHeaders.getResponseClass()); + + try { + // write request + Channel channel = bootstrap.connect(configuredTargetAddress, configuredTargetPort).sync().channel(); + channel.writeAndFlush(httpRequest); + channel.closeFuture(); + } catch (InterruptedException e) { + return FutureUtils.completedExceptionally(e); + } + return responseFuture; } - return responseFuture; } - @ChannelHandler.Sharable - private static class ClientHandler extends SimpleChannelInboundHandler { + private static class RestChannelInitializer extends ChannelInitializer { - private volatile ExpectedResponse expectedResponse = null; + private final SSLEngine sslEngine; + private final ClientHandler handler; + + public RestChannelInitializer(SSLEngine sslEngine, ClientHandler handler) { + this.sslEngine = sslEngine; + this.handler = handler; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + // SSL should be the first handler in the pipeline + if (sslEngine != null) { + ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); + } + + ch.pipeline() + .addLast(new HttpClientCodec()) + .addLast(new HttpObjectAggregator(1024 * 1024)) + .addLast(handler) + .addLast(new PipelineErrorHandler(LOG)); + } + } + + private static class ClientHandler extends SimpleChannelInboundHandler { + + private final ExpectedResponse expectedResponse; + + private ClientHandler(ExpectedResponse expectedResponse) { + this.expectedResponse = expectedResponse; + } + + static CompletableFuture addHandlerForResponse(Bootstrap bootStrap, SSLEngine sslEngine, Class expectedResponse) { + CompletableFuture responseFuture = new CompletableFuture<>(); + + ClientHandler handler = new ClientHandler<>(new ExpectedResponse<>(expectedResponse, responseFuture)); + bootStrap.handler(new RestChannelInitializer(sslEngine, handler)); + + return r
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136888#comment-16136888 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134505357 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ParameterMapper; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * This client is the counter-part to the {@link RestServerEndpoint}. + */ +public class RestClientEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class); + + private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + private final String configuredTargetAddress; + private final int configuredTargetPort; + private final SSLEngine sslEngine; + + private Bootstrap bootstrap; + + private final ClientHandle
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134505357 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ParameterMapper; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLEngine; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * This client is the counter-part to the {@link RestServerEndpoint}. + */ +public class RestClientEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class); + + private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + private final String configuredTargetAddress; + private final int configuredTargetPort; + private final SSLEngine sslEngine; + + private Bootstrap bootstrap; + + private final ClientHandler handler = new ClientHandler(); + + private CompletableFuture lastFuture = CompletableFuture.completedFuture(null); + + private final Executor directExecutor = Executors.directExecutor(); + + public RestClientEndpoint(RestCl
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136873#comment-16136873 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134502397 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +/** + * Simple container for the response of a handler, that contains either a response of type {@code P} if the incoming + * request was handled successfully, otherwise it contains an error message and an associated error code. + * + * @param type of successful response + */ +public final class HandlerResponse { --- End diff -- I can change that, but I don't see how this is superior regarding type safety. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134502397 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +/** + * Simple container for the response of a handler, that contains either a response of type {@code P} if the incoming + * request was handled successfully, otherwise it contains an error message and an associated error code. + * + * @param type of successful response + */ +public final class HandlerResponse { --- End diff -- I can change that, but I don't see how this is superior regarding type safety. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7489) Remove job lifecycle methods from public JobMasterGateway interface
[ https://issues.apache.org/jira/browse/FLINK-7489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136872#comment-16136872 ] ASF GitHub Bot commented on FLINK-7489: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4573 [FLINK-7489] Remove startJobExecution and suspendExecution from JobMasterGateway ## What is the purpose of the change The job lifecycle methods `startJobExecution` and `suspendExecution` should not be exposed as RPCs. Therefore, this commit removes them from the JobMasterGateway definition. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink removeJMRPCs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4573.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4573 commit 4b1088243a24fc3791c56eff37e1ca5fada8afbd Author: Till Rohrmann Date: 2017-08-22T14:33:05Z [FLINK-7489] Remove startJobExecution and suspendExecution from JobMasterGateway The job lifecycle methods should not be exposed as RPCs. Therefore, this commit removes them from the JobMasterGateway definition. > Remove job lifecycle methods from public JobMasterGateway interface > --- > > Key: FLINK-7489 > URL: https://issues.apache.org/jira/browse/FLINK-7489 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > The {{JobMaster}} exposes its job lifecycle methods {{startJobExecution}} and > {{suspendExecution}} via its {{JobMasterGateway}} to the public. I think > these methods should not be exposed because it allows other components to > affect the job execution without proper reason. Only the {{JobManagerRunner}} > should be responsible for calling these methods and has direct access to the > {{JobMaster}} instance. Therefore, these methods can directly be implemented > by the {{JobMaster}} without exposing them via the {{JobMasterGateway}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4573: [FLINK-7489] Remove startJobExecution and suspendE...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4573 [FLINK-7489] Remove startJobExecution and suspendExecution from JobMasterGateway ## What is the purpose of the change The job lifecycle methods `startJobExecution` and `suspendExecution` should not be exposed as RPCs. Therefore, this commit removes them from the JobMasterGateway definition. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink removeJMRPCs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4573.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4573 commit 4b1088243a24fc3791c56eff37e1ca5fada8afbd Author: Till Rohrmann Date: 2017-08-22T14:33:05Z [FLINK-7489] Remove startJobExecution and suspendExecution from JobMasterGateway The job lifecycle methods should not be exposed as RPCs. Therefore, this commit removes them from the JobMasterGateway definition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7489) Remove job lifecycle methods from public JobMasterGateway interface
Till Rohrmann created FLINK-7489: Summary: Remove job lifecycle methods from public JobMasterGateway interface Key: FLINK-7489 URL: https://issues.apache.org/jira/browse/FLINK-7489 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Minor The {{JobMaster}} exposes its job lifecycle methods {{startJobExecution}} and {{suspendExecution}} via its {{JobMasterGateway}} to the public. I think these methods should not be exposed because it allows other components to affect the job execution without proper reason. Only the {{JobManagerRunner}} should be responsible for calling these methods and has direct access to the {{JobMaster}} instance. Therefore, these methods can directly be implemented by the {{JobMaster}} without exposing them via the {{JobMasterGateway}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136868#comment-16136868 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134499984 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.List; + +/** + * Generic response body for communicating errors on the server. + */ +public final class ErrorResponseBody implements ResponseBody { + + static final String FIELD_NAME_ERRORS = "errors"; + + @JsonProperty(FIELD_NAME_ERRORS) + public final List errors; --- End diff -- Alright, then leave it like it is. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134499984 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.List; + +/** + * Generic response body for communicating errors on the server. + */ +public final class ErrorResponseBody implements ResponseBody { + + static final String FIELD_NAME_ERRORS = "errors"; + + @JsonProperty(FIELD_NAME_ERRORS) + public final List errors; --- End diff -- Alright, then leave it like it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136865#comment-16136865 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134499043 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.List; + +/** + * Generic response body for communicating errors on the server. + */ +public final class ErrorResponseBody implements ResponseBody { + + static final String FIELD_NAME_ERRORS = "errors"; + + @JsonProperty(FIELD_NAME_ERRORS) + public final List errors; --- End diff -- it's part of the API, so we either add it now or break it later. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134499043 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.List; + +/** + * Generic response body for communicating errors on the server. + */ +public final class ErrorResponseBody implements ResponseBody { + + static final String FIELD_NAME_ERRORS = "errors"; + + @JsonProperty(FIELD_NAME_ERRORS) + public final List errors; --- End diff -- it's part of the API, so we either add it now or break it later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136853#comment-16136853 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134494673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.List; + +/** + * Generic response body for communicating errors on the server. + */ +public final class ErrorResponseBody implements ResponseBody { + + static final String FIELD_NAME_ERRORS = "errors"; + + @JsonProperty(FIELD_NAME_ERRORS) + public final List errors; --- End diff -- But it's not yet used and somewhat misleading if it was initialized with a singleton list before. I suggest to remove it until needed. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134494673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.List; + +/** + * Generic response body for communicating errors on the server. + */ +public final class ErrorResponseBody implements ResponseBody { + + static final String FIELD_NAME_ERRORS = "errors"; + + @JsonProperty(FIELD_NAME_ERRORS) + public final List errors; --- End diff -- But it's not yet used and somewhat misleading if it was initialized with a singleton list before. I suggest to remove it until needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136817#comment-16136817 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134485505 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.List; + +/** + * Generic response body for communicating errors on the server. + */ +public final class ErrorResponseBody implements ResponseBody { + + static final String FIELD_NAME_ERRORS = "errors"; + + @JsonProperty(FIELD_NAME_ERRORS) + public final List errors; --- End diff -- The list constructor isn't meant to be private. The idea is to allow us to provide multiple error messages, which can be useful if a handler does multiple actions in parallel and multiple fail. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134485505 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.List; + +/** + * Generic response body for communicating errors on the server. + */ +public final class ErrorResponseBody implements ResponseBody { + + static final String FIELD_NAME_ERRORS = "errors"; + + @JsonProperty(FIELD_NAME_ERRORS) + public final List errors; --- End diff -- The list constructor isn't meant to be private. The idea is to allow us to provide multiple error messages, which can be useful if a handler does multiple actions in parallel and multiple fail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136772#comment-16136772 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134474895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Super class for netty-based handlers that work with {@link RequestBody}s and {@link ResponseBody}s. + * + * Subclasses must be thread-safe. + * + * @param type of incoming requests + * @param type of outgoing responses + */ +@ChannelHandler.Sharable +public abstract class AbstractRestHandler extends SimpleChannelInboundHandler { + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper(); + + private final MessageHeaders messageHeaders; + + protected AbstractRestHandler(MessageHeaders messageHeaders) { + this.messageHeaders = messageHeaders; + } + + public MessageHeaders getMessageHeaders() { + return messageHeaders; + } + + @Override + protected void ch
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134474815 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.Preconditions; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +/** + * A configuration object for {@link RestServerEndpoint}s. + */ +public final class RestServerEndpointConfiguration { + + private final String restBindAddress; + private final int restBindPort; + private final SSLEngine sslEngine; + + private RestServerEndpointConfiguration(String restBindAddress, int targetRestEndpointPort, SSLEngine sslEngine) { + this.restBindAddress = restBindAddress; + this.restBindPort = targetRestEndpointPort; + this.sslEngine = sslEngine; --- End diff -- Then please annotate them with `@Nullable` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134474895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Super class for netty-based handlers that work with {@link RequestBody}s and {@link ResponseBody}s. + * + * Subclasses must be thread-safe. + * + * @param type of incoming requests + * @param type of outgoing responses + */ +@ChannelHandler.Sharable +public abstract class AbstractRestHandler extends SimpleChannelInboundHandler { + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper(); + + private final MessageHeaders messageHeaders; + + protected AbstractRestHandler(MessageHeaders messageHeaders) { + this.messageHeaders = messageHeaders; + } + + public MessageHeaders getMessageHeaders() { + return messageHeaders; + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception { + log.debug("Received request."); + final HttpRequest httpRequest = routed.request(); + + try { + if (!(httpR
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136771#comment-16136771 ] ASF GitHub Bot commented on FLINK-7040: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4569#discussion_r134474815 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.Preconditions; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +/** + * A configuration object for {@link RestServerEndpoint}s. + */ +public final class RestServerEndpointConfiguration { + + private final String restBindAddress; + private final int restBindPort; + private final SSLEngine sslEngine; + + private RestServerEndpointConfiguration(String restBindAddress, int targetRestEndpointPort, SSLEngine sslEngine) { + this.restBindAddress = restBindAddress; + this.restBindPort = targetRestEndpointPort; + this.sslEngine = sslEngine; --- End diff -- Then please annotate them with `@Nullable` > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture
[ https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-5851. Resolution: Fixed Fixed via 40cec17f4303b43bbf65d8be542f0646eada57e8 > Renaming AsyncCollector into ResultPromise/ResultFuture > --- > > Key: FLINK-5851 > URL: https://issues.apache.org/jira/browse/FLINK-5851 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: mingleizhang > Labels: api-breaking > Fix For: 1.4.0 > > > Currently, the async I/O API gives an {{AsyncCollector}} to an > {{AsyncFunction}} implementation. The name does not really reflect what the > {{AsyncCollector}} does since it does not collect but is actually a one time > completable future. Therefore, I propose to rename the {{AsyncCollector}} > into {{ResultPromise}} or {{ResultFuture}}. This is API changing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture
[ https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5851: - Labels: api-breaking (was: ) > Renaming AsyncCollector into ResultPromise/ResultFuture > --- > > Key: FLINK-5851 > URL: https://issues.apache.org/jira/browse/FLINK-5851 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: mingleizhang > Labels: api-breaking > Fix For: 1.4.0 > > > Currently, the async I/O API gives an {{AsyncCollector}} to an > {{AsyncFunction}} implementation. The name does not really reflect what the > {{AsyncCollector}} does since it does not collect but is actually a one time > completable future. Therefore, I propose to rename the {{AsyncCollector}} > into {{ResultPromise}} or {{ResultFuture}}. This is API changing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture
[ https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-5851: -- > Renaming AsyncCollector into ResultPromise/ResultFuture > --- > > Key: FLINK-5851 > URL: https://issues.apache.org/jira/browse/FLINK-5851 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: mingleizhang > Labels: api-breaking > Fix For: 1.4.0 > > > Currently, the async I/O API gives an {{AsyncCollector}} to an > {{AsyncFunction}} implementation. The name does not really reflect what the > {{AsyncCollector}} does since it does not collect but is actually a one time > completable future. Therefore, I propose to rename the {{AsyncCollector}} > into {{ResultPromise}} or {{ResultFuture}}. This is API changing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture
[ https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-5851. Resolution: Fixed Release Note: This change is breaking the API of the AsyncFunction which is now called with a {{ResultFuture}} instead of a {{AsyncCollector}}. In order to complete the future with the result one has to call {{ResultFuture#complete}} or {{ResultFuture#completeExceptionally}} in case of an exceptional completion. Fixed via 40cec17f4303b43bbf65d8be542f0646eada57e8 > Renaming AsyncCollector into ResultPromise/ResultFuture > --- > > Key: FLINK-5851 > URL: https://issues.apache.org/jira/browse/FLINK-5851 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: mingleizhang > Fix For: 1.4.0 > > > Currently, the async I/O API gives an {{AsyncCollector}} to an > {{AsyncFunction}} implementation. The name does not really reflect what the > {{AsyncCollector}} does since it does not collect but is actually a one time > completable future. Therefore, I propose to rename the {{AsyncCollector}} > into {{ResultPromise}} or {{ResultFuture}}. This is API changing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture
[ https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136764#comment-16136764 ] ASF GitHub Bot commented on FLINK-5851: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4243 > Renaming AsyncCollector into ResultPromise/ResultFuture > --- > > Key: FLINK-5851 > URL: https://issues.apache.org/jira/browse/FLINK-5851 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: mingleizhang > Fix For: 1.4.0 > > > Currently, the async I/O API gives an {{AsyncCollector}} to an > {{AsyncFunction}} implementation. The name does not really reflect what the > {{AsyncCollector}} does since it does not collect but is actually a one time > completable future. Therefore, I propose to rename the {{AsyncCollector}} > into {{ResultPromise}} or {{ResultFuture}}. This is API changing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4243: [FLINK-5851] [streaming API] Rename AsyncCollector...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4243 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture
[ https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-5851: Assignee: mingleizhang > Renaming AsyncCollector into ResultPromise/ResultFuture > --- > > Key: FLINK-5851 > URL: https://issues.apache.org/jira/browse/FLINK-5851 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: mingleizhang > Fix For: 1.4.0 > > > Currently, the async I/O API gives an {{AsyncCollector}} to an > {{AsyncFunction}} implementation. The name does not really reflect what the > {{AsyncCollector}} does since it does not collect but is actually a one time > completable future. Therefore, I propose to rename the {{AsyncCollector}} > into {{ResultPromise}} or {{ResultFuture}}. This is API changing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)