[GitHub] flink pull request #2460: [FLINK-4562] table examples make an divided module...

2016-09-02 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2460

[FLINK-4562] table examples make an divided module in flink-examples

only move table examle code to a divided module in flink-examples.
Table API module should't contain example code.

[issue](https://issues.apache.org/jira/browse/FLINK-4562)

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-4562] table 
examples make an divided module in flink-examples")



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink table-example-module

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2460.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2460


commit 4fb919b7bc31a269bfa82e32ae67fb43065ba6ac
Author: shijinkui 
Date:   2016-09-02T06:50:11Z

[FLINK-4562] table examples make an divided module in flink-examples




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4562) table examples make an divided module in flink-examples

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457733#comment-15457733
 ] 

ASF GitHub Bot commented on FLINK-4562:
---

GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2460

[FLINK-4562] table examples make an divided module in flink-examples

only move table examle code to a divided module in flink-examples.
Table API module should't contain example code.

[issue](https://issues.apache.org/jira/browse/FLINK-4562)

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-4562] table 
examples make an divided module in flink-examples")



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink table-example-module

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2460.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2460


commit 4fb919b7bc31a269bfa82e32ae67fb43065ba6ac
Author: shijinkui 
Date:   2016-09-02T06:50:11Z

[FLINK-4562] table examples make an divided module in flink-examples




> table examples make an divided module in flink-examples
> ---
>
> Key: FLINK-4562
> URL: https://issues.apache.org/jira/browse/FLINK-4562
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> example code should't packaged in table module.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2459: [FLINK-4561] replace all the scala version as a `scala.bi...

2016-09-02 Thread chiwanpark
Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/2459
  
Hi @shijinkui, thanks for opening pull request. Unfortunately, this pull 
request cause a problem with maven shading plugin 
(https://issues.apache.org/jira/browse/MSHADE-200). Due to the problem, Flink 
community decided to use version string instead of property and add to a shell 
script (tools/change-scala-version.sh) to convert Scala version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4561) replace all the scala version as a `scala.binary.version` property

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457750#comment-15457750
 ] 

ASF GitHub Bot commented on FLINK-4561:
---

Github user chiwanpark commented on the issue:

https://github.com/apache/flink/pull/2459
  
Hi @shijinkui, thanks for opening pull request. Unfortunately, this pull 
request cause a problem with maven shading plugin 
(https://issues.apache.org/jira/browse/MSHADE-200). Due to the problem, Flink 
community decided to use version string instead of property and add to a shell 
script (tools/change-scala-version.sh) to convert Scala version.


> replace all the scala version as a `scala.binary.version` property
> --
>
> Key: FLINK-4561
> URL: https://issues.apache.org/jira/browse/FLINK-4561
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> replace all the scala version(2.10) as a property `scala.binary.version` 
> defined in root pom properties. default scala version property is 2.10.
> modify:
> 1. dependency include scala version 
> 2. module defining include scala version
> 3. scala version upgrade to 2.11.8 from 2.11.7



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4513) Kafka connector documentation refers to Flink 1.1-SNAPSHOT

2016-09-02 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457773#comment-15457773
 ] 

Simone Robutti commented on FLINK-4513:
---

I noticed that this is true for the docs of every connector. It shouldbe just 
1.1, correct?

> Kafka connector documentation refers to Flink 1.1-SNAPSHOT
> --
>
> Key: FLINK-4513
> URL: https://issues.apache.org/jira/browse/FLINK-4513
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.1
>Reporter: Fabian Hueske
>Priority: Trivial
> Fix For: 1.1.2
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2447: [FLINK-4490] [distributed coordination] Decouple the JobM...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2447
  
Looking into it now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457779#comment-15457779
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2447
  
Looking into it now


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457902#comment-15457902
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77309411
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 ---
@@ -19,20 +19,19 @@
 package org.apache.flink.mesos.runtime.clusterframework
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional 
Mesos-related
   * messages.
   */
 class MesosTaskManager(
 config: TaskManagerConfiguration,
 resourceID: ResourceID,
-connectionInfo: InstanceConnectionInfo,
+connectionInfo: TaskManagerLocation,
--- End diff --

Shall we rename the variable accordingly?


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77309411
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 ---
@@ -19,20 +19,19 @@
 package org.apache.flink.mesos.runtime.clusterframework
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional 
Mesos-related
   * messages.
   */
 class MesosTaskManager(
 config: TaskManagerConfiguration,
 resourceID: ResourceID,
-connectionInfo: InstanceConnectionInfo,
+connectionInfo: TaskManagerLocation,
--- End diff --

Shall we rename the variable accordingly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2397: [FLINK-4439] Validate 'bootstrap.servers' config in flink...

2016-09-02 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2397
  
Could you rebase to master again, so that the build turns green?
https://travis-ci.org/apache/flink/builds/155979197
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4439) Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457905#comment-15457905
 ] 

ASF GitHub Bot commented on FLINK-4439:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2397
  
Could you rebase to master again, so that the build turns green?
https://travis-ci.org/apache/flink/builds/155979197
Thank you!


> Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
> --
>
> Key: FLINK-4439
> URL: https://issues.apache.org/jira/browse/FLINK-4439
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.3
>Reporter: Gheorghe Gheorghe
>Priority: Minor
>
> The "flink-connector-kafka-0.8_2"  is logging the following error when all 
> 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. 
> See stacktrace: 
> {code:title=stacktrace|borderStyle=solid}
> 2016-08-21 15:22:30 WARN  FlinkKafkaConsumerBase:290 - Error communicating 
> with broker inexistentKafkHost:9092 to find partitions for [testTopic].class 
> java.nio.channels.ClosedChannelException. Message: null
> 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
> java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
>   at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:164)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:131)
>   at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
>   at MetricsFromKafka.main(MetricsFromKafka.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at sbt.Run.invokeMain(Run.scala:67)
>   at sbt.Run.run0(Run.scala:61)
>   at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
>   at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Logger$$anon$4.apply(Logger.scala:84)
>   at sbt.TrapExit$App.run(TrapExit.scala:248)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> In the above stackrace it is hard to figure out that the actual servers 
> provided as a config cannot be resolved to a valid ip address. Moreover the 
> flink kafka consumer will try all of those servers one by one and failing to 
> get partition information.
> The suggested improvement is to fail fast and announce the user that the 
> servers provided in the 'boostrap.servers' config are invalid. If at least 
> one server is valid then the exception should not be thrown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -104,16 +107,17 @@ public String toString() {
(producerState == ExecutionState.RUNNING
|| producerState == 
ExecutionState.FINISHED)) {
 
-   final Instance partitionInstance = 
producerSlot.getInstance();
+   final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
--- End diff --

Does it make sense to rename the variable to `producerTaskManager`? Then it 
is symmetric to `consumerTaskManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457918#comment-15457918
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -104,16 +107,17 @@ public String toString() {
(producerState == ExecutionState.RUNNING
|| producerState == 
ExecutionState.FINISHED)) {
 
-   final Instance partitionInstance = 
producerSlot.getInstance();
+   final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
--- End diff --

Does it make sense to rename the variable to `producerTaskManager`? Then it 
is symmetric to `consumerTaskManager`.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310357
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
 ---
@@ -44,15 +44,15 @@
private final ResultPartitionID partitionID;
 
/** The partition connection info. */
-   private final InstanceConnectionInfo partitionConnectionInfo;
+   private final TaskManagerLocation partitionConnectionInfo;
 
/** The partition connection index. */
private final int partitionConnectionIndex;
 
public PartialInputChannelDeploymentDescriptor(
IntermediateDataSetID resultId,
ResultPartitionID partitionID,
-   InstanceConnectionInfo partitionConnectionInfo,
+   TaskManagerLocation partitionConnectionInfo,
--- End diff --

Shall we adapt the variable names as well? Otherwise it might be a little 
confusing that a `partitionConnectionInfo` is actually of type 
`TaskManagerLocation`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457923#comment-15457923
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310357
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
 ---
@@ -44,15 +44,15 @@
private final ResultPartitionID partitionID;
 
/** The partition connection info. */
-   private final InstanceConnectionInfo partitionConnectionInfo;
+   private final TaskManagerLocation partitionConnectionInfo;
 
/** The partition connection index. */
private final int partitionConnectionIndex;
 
public PartialInputChannelDeploymentDescriptor(
IntermediateDataSetID resultId,
ResultPartitionID partitionID,
-   InstanceConnectionInfo partitionConnectionInfo,
+   TaskManagerLocation partitionConnectionInfo,
--- End diff --

Shall we adapt the variable names as well? Otherwise it might be a little 
confusing that a `partitionConnectionInfo` is actually of type 
`TaskManagerLocation`.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2418: [FLINK-4245] JMXReporter exposes all defined variables

2016-09-02 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2418
  
ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4245) Metric naming improvements

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457926#comment-15457926
 ] 

ASF GitHub Bot commented on FLINK-4245:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2418
  
ok.


> Metric naming improvements
> --
>
> Key: FLINK-4245
> URL: https://issues.apache.org/jira/browse/FLINK-4245
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Stephan Ewen
>
> A metric currently has two parts to it:
>   - The name of that particular metric
>   - The "scope" (or namespace), defined by the group that contains the metric.
> A metric group actually always implicitly has a map of naming "tags", like:
>   - taskmanager_host : 
>   - taskmanager_id : 
>   - task_name : "map() -> filter()"
> We derive the scope from that map, following the defined scope formats.
> For JMX (and some users that use JMX), it would be natural to expose that map 
> of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
> can expose a metric like:
>   - domain: "taskmanager.task.operator.io"
>   - name: "numRecordsIn"
>   - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
> X.java:123", ... }
> For many other reporters, the formatted scope makes a lot of sense, since 
> they think only in terms of (scope, metric-name).
> We may even have the formatted scope in JMX as well (in the domain), if we 
> want to go that route. 
> [~jgrier] and [~Zentol] - what do you think about that?
> [~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2345: [FLINK-4340] Remove RocksDB Semi-Async Checkpoint ...

2016-09-02 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2345


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -104,16 +107,17 @@ public String toString() {
(producerState == ExecutionState.RUNNING
|| producerState == 
ExecutionState.FINISHED)) {
 
-   final Instance partitionInstance = 
producerSlot.getInstance();
+   final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
--- End diff --

But in the future the partition might reside on a machine which was not the 
producer. Thus, I guess that it's good then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4340) Remove RocksDB Semi-Async Checkpoint Mode

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457932#comment-15457932
 ] 

ASF GitHub Bot commented on FLINK-4340:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2345


> Remove RocksDB Semi-Async Checkpoint Mode
> -
>
> Key: FLINK-4340
> URL: https://issues.apache.org/jira/browse/FLINK-4340
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> This seems to be causing to many problems and is also incompatible with the 
> upcoming key-group/sharding changes that will allow rescaling of keyed state.
> Once this is done we can also close FLINK-4228.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457930#comment-15457930
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -104,16 +107,17 @@ public String toString() {
(producerState == ExecutionState.RUNNING
|| producerState == 
ExecutionState.FINISHED)) {
 
-   final Instance partitionInstance = 
producerSlot.getInstance();
+   final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
--- End diff --

But in the future the partition might reside on a machine which was not the 
producer. Thus, I guess that it's good then.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r7737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---
@@ -84,22 +90,22 @@
 * Constructs an instance reflecting a registered TaskManager.
 *
 * @param actorGateway The actor gateway to communicate with the remote 
instance
-* @param connectionInfo The remote connection where the task manager 
receives requests.
-* @param resourceId The resource id which denotes the resource the 
task manager uses.
+* @param location The remote connection where the task manager 
receives requests.
+* @param taskManagerId The resource id which denotes the resource the 
task manager uses.
 * @param id The id under which the taskManager is registered.
 * @param resources The resources available on the machine.
 * @param numberOfSlots The number of task slots offered by this 
taskManager.
 */
public Instance(
ActorGateway actorGateway,
-   InstanceConnectionInfo connectionInfo,
-   ResourceID resourceId,
+   TaskManagerLocation location,
+   ResourceID taskManagerId,
--- End diff --

Isn't the resource ID part of the `TaskManagerLocation`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457936#comment-15457936
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r7737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---
@@ -84,22 +90,22 @@
 * Constructs an instance reflecting a registered TaskManager.
 *
 * @param actorGateway The actor gateway to communicate with the remote 
instance
-* @param connectionInfo The remote connection where the task manager 
receives requests.
-* @param resourceId The resource id which denotes the resource the 
task manager uses.
+* @param location The remote connection where the task manager 
receives requests.
+* @param taskManagerId The resource id which denotes the resource the 
task manager uses.
 * @param id The id under which the taskManager is registered.
 * @param resources The resources available on the machine.
 * @param numberOfSlots The number of task slots offered by this 
taskManager.
 */
public Instance(
ActorGateway actorGateway,
-   InstanceConnectionInfo connectionInfo,
-   ResourceID resourceId,
+   TaskManagerLocation location,
+   ResourceID taskManagerId,
--- End diff --

Isn't the resource ID part of the `TaskManagerLocation`?


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457941#comment-15457941
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77311597
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
 ---
@@ -147,7 +148,7 @@ public boolean reportHeartBeat(InstanceID instanceId, 
byte[] lastMetricsReport)
public InstanceID registerTaskManager(
ActorRef taskManager,
ResourceID resourceID,
--- End diff --

`ResourceID` should be contained in the `TaskManagerLocation`.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77311597
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
 ---
@@ -147,7 +148,7 @@ public boolean reportHeartBeat(InstanceID instanceId, 
byte[] lastMetricsReport)
public InstanceID registerTaskManager(
ActorRef taskManager,
ResourceID resourceID,
--- End diff --

`ResourceID` should be contained in the `TaskManagerLocation`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem

2016-09-02 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457967#comment-15457967
 ] 

Maximilian Michels commented on FLINK-4485:
---

Thanks a lot for the minimal example to reproduce the issue. We will reproduce 
and fix the issue as soon as possible.

> Finished jobs in yarn session fill /tmp filesystem
> --
>
> Key: FLINK-4485
> URL: https://issues.apache.org/jira/browse/FLINK-4485
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.1.0
>Reporter: Niels Basjes
>Priority: Blocker
>
> On a Yarn cluster I start a yarn-session with a few containers and task slots.
> Then I fire a 'large' number of Flink batch jobs in sequence against this 
> yarn session. It is the exact same job (java code) yet it gets different 
> parameters.
> In this scenario it is exporting HBase tables to files in HDFS and the 
> parameters are about which data from which tables and the name of the target 
> directory.
> After running several dozen jobs the jobs submission started to fail and we 
> investigated.
> We found that the cause was that on the Yarn node which was hosting the 
> jobmanager the /tmp file system was full (4GB was 100% full).
> How ever the output of {{du -hcs /tmp}} showed only 200MB in use.
> We found that a very large file (we guess it is the jar of the job) was put 
> in /tmp , used, deleted yet the file handle was not closed by the jobmanager.
> As soon as we killed the jobmanager the disk space was freed.
> The summary of the impact of this is that a yarn-session that receives enough 
> jobs brings down the Yarn node for all users.
> See parts of the output we got from {{lsof}} below.
> {code}
> COMMAND PID  USER   FD  TYPE DEVICE  SIZE   
> NODE NAME
> java  15034   nbasjes  550r  REG 253,17  66219695
> 245 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 
> (deleted)
> java  15034   nbasjes  551r  REG 253,17  66219695
> 252 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 
> (deleted)
> java  15034   nbasjes  552r  REG 253,17  66219695
> 267 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 
> (deleted)
> java  15034   nbasjes  553r  REG 253,17  66219695
> 250 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 
> (deleted)
> java  15034   nbasjes  554r  REG 253,17  66219695
> 288 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 
> (deleted)
> java  15034   nbasjes  555r  REG 253,17  66219695
> 298 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 
> (deleted)
> java  15034   nbasjes  557r  REG 253,17  66219695
> 254 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 
> (deleted)
> java  15034   nbasjes  558r  REG 253,17  66219695
> 292 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 
> (deleted)
> java  15034   nbasjes  559r  REG 253,17  66219695
> 275 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 
> (deleted)
> java  15034   nbasjes  560r  REG 253,17  66219695
> 159 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 
> (deleted)
> java  15034   nbasjes  562r  REG 253,17  66219695
> 238 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 
> (deleted)
> java  15034   nbasjes  568r  REG 253,17  66219695
> 246 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 
> (deleted)
> java  15034   nbasjes  569r  REG 253,17  66219695
> 255 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0009 
> (deleted)
> java  15034   nbasjes  571r  REG 253,17  66219695
> 299 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0026 
> (deleted)
> java  15034   nbasjes  572r  REG 253,17  66219695
> 293 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0020 
> (deleted)
> java  15034   nbasjes  574r  REG 253,17  66219695
> 256 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0010 
> (deleted)
> java  15034   nbasjes  575r  REG 253,17  66219695
> 302 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-000

[jira] [Commented] (FLINK-4544) TaskManager metrics are vulnerable to custom JMX bean installation

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457975#comment-15457975
 ] 

ASF GitHub Bot commented on FLINK-4544:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2445
  
Version 2 is up. Essentially, we now attempt to use the methods before 
registering the metrics. I've also adjusted the new CPU metrics.


> TaskManager metrics are vulnerable to custom JMX bean installation
> --
>
> Key: FLINK-4544
> URL: https://issues.apache.org/jira/browse/FLINK-4544
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
> Fix For: 1.2.0, 1.1.3
>
>
> The TaskManager's CPU load magic may fail when JMX providers are overwritten.
> The TaskManager logic checks if the class 
> {{com.sun.management.OperatingSystemMXBean}} is available. If yes, it assumes 
> that the {{ManagementFactory.getOperatingSystemMXBean()}} is of that type. 
> That is not necessarily the case.
> This is visible in the Cassandra tests, as Cassandra overrides the JMX 
> provider - every heartbeat causes an exception that is logged (See below), 
> flooding the log, killing the heartbeat message.
> I would also suggest to move the entire metrics code out of the 
> {{TaskManager}} class into a dedicated class {{TaskManagerJvmMetrics}}. That 
> one can, with a static method, install the metrics into the TaskManager's 
> metric group.
> Sample stack trace when default platform beans are overridden:
> {code}
> 23914 [flink-akka.actor.default-dispatcher-3] WARN  
> org.apache.flink.runtime.taskmanager.TaskManager  - Error retrieving CPU Load 
> through OperatingSystemMXBean
> java.lang.IllegalArgumentException: object is not an instance of declaring 
> class
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anon$3$$anonfun$getValue$2.apply(TaskManager.scala:2351)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anon$3$$anonfun$getValue$2.apply(TaskManager.scala:2351)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anon$3.getValue(TaskManager.scala:2351)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anon$3.getValue(TaskManager.scala:2348)
>   at 
> com.codahale.metrics.json.MetricsModule$GaugeSerializer.serialize(MetricsModule.java:32)
>   at 
> com.codahale.metrics.json.MetricsModule$GaugeSerializer.serialize(MetricsModule.java:20)
>   at 
> com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:616)
>   at 
> com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:519)
>   at 
> com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:31)
>   at 
> com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2444)
>   at 
> com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:355)
>   at 
> com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1442)
>   at 
> com.codahale.metrics.json.MetricsModule$MetricRegistrySerializer.serialize(MetricsModule.java:186)
>   at 
> com.codahale.metrics.json.MetricsModule$MetricRegistrySerializer.serialize(MetricsModule.java:171)
>   at 
> com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3022)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.sendHeartbeatToJobManager(TaskManager.scala:1278)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:309)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.testingUtils.TestingTaskManagerLike$$anonfun$handleTest

[GitHub] flink issue #2445: [FLINK-4544] Refactor old CPU metric initialization

2016-09-02 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2445
  
Version 2 is up. Essentially, we now attempt to use the methods before 
registering the metrics. I've also adjusted the new CPU metrics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77313861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 ---
@@ -576,7 +560,7 @@ private void internalDisposeEmptySharedSlot(SharedSlot 
sharedSlot) {

if (parent == null) {
// root slot, return to the instance.
-   
sharedSlot.getInstance().returnAllocatedSlot(sharedSlot);
+   sharedSlot.getOwner().returnAllocatedSlot(sharedSlot);
--- End diff --

Can we use `sharedSlot.releaseSlot` instead? It seems not so right to get 
the owner which is then used to call `returnAllocatedSlot` with the same slot. 
I think this violates the law of Demeter.

Alternatively, we could add a method `returnSlot` to `Slot` which does: 
`owner.returnAllocatedSlot(this)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457976#comment-15457976
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77313861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 ---
@@ -576,7 +560,7 @@ private void internalDisposeEmptySharedSlot(SharedSlot 
sharedSlot) {

if (parent == null) {
// root slot, return to the instance.
-   
sharedSlot.getInstance().returnAllocatedSlot(sharedSlot);
+   sharedSlot.getOwner().returnAllocatedSlot(sharedSlot);
--- End diff --

Can we use `sharedSlot.releaseSlot` instead? It seems not so right to get 
the owner which is then used to call `returnAllocatedSlot` with the same slot. 
I think this violates the law of Demeter.

Alternatively, we could add a method `returnSlot` to `Slot` which does: 
`owner.returnAllocatedSlot(this)`.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457980#comment-15457980
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77314097
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
 ---
@@ -41,7 +41,7 @@
 
private final int connectionIndex;
 
-   public ConnectionID(InstanceConnectionInfo connectionInfo, int 
connectionIndex) {
+   public ConnectionID(TaskManagerLocation connectionInfo, int 
connectionIndex) {
--- End diff --

variable name


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77314097
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
 ---
@@ -41,7 +41,7 @@
 
private final int connectionIndex;
 
-   public ConnectionID(InstanceConnectionInfo connectionInfo, int 
connectionIndex) {
+   public ConnectionID(TaskManagerLocation connectionInfo, int 
connectionIndex) {
--- End diff --

variable name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2152: [FLINK-3920] Distributed Linear Algebra: block-based matr...

2016-09-02 Thread chobeat
Github user chobeat commented on the issue:

https://github.com/apache/flink/pull/2152
  
Is travis still broken? I see it worked for one build but failed for the 
others. Is there anything more to review on this PR or we can proceed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457984#comment-15457984
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

Github user chobeat commented on the issue:

https://github.com/apache/flink/pull/2152
  
Is travis still broken? I see it worked for one build but failed for the 
others. Is there anything more to review on this PR or we can proceed?


> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457992#comment-15457992
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2425
  
T2-3 is not about the web interface netty, its about the data transfer 
netty 
In Flink, we are using netty for (at least) three things:
- Akka is using Netty. This is addressed in the pull request
- The web interface is using Netty. Addressed as well 
- The user data (datastreams, etc.) is transferred using Netty between the 
TaskManagers as well.



> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-09-02 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2425
  
T2-3 is not about the web interface netty, its about the data transfer 
netty 
In Flink, we are using netty for (at least) three things:
- Akka is using Netty. This is addressed in the pull request
- The web interface is using Netty. Addressed as well 
- The user data (datastreams, etc.) is transferred using Netty between the 
TaskManagers as well.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2397: [FLINK-4439] Validate 'bootstrap.servers' config in flink...

2016-09-02 Thread gheo21
Github user gheo21 commented on the issue:

https://github.com/apache/flink/pull/2397
  
Sure, did it. Let's see if it get's green! Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4439) Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457997#comment-15457997
 ] 

ASF GitHub Bot commented on FLINK-4439:
---

Github user gheo21 commented on the issue:

https://github.com/apache/flink/pull/2397
  
Sure, did it. Let's see if it get's green! Thanks.


> Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
> --
>
> Key: FLINK-4439
> URL: https://issues.apache.org/jira/browse/FLINK-4439
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.3
>Reporter: Gheorghe Gheorghe
>Priority: Minor
>
> The "flink-connector-kafka-0.8_2"  is logging the following error when all 
> 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. 
> See stacktrace: 
> {code:title=stacktrace|borderStyle=solid}
> 2016-08-21 15:22:30 WARN  FlinkKafkaConsumerBase:290 - Error communicating 
> with broker inexistentKafkHost:9092 to find partitions for [testTopic].class 
> java.nio.channels.ClosedChannelException. Message: null
> 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
> java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
>   at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:164)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:131)
>   at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
>   at MetricsFromKafka.main(MetricsFromKafka.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at sbt.Run.invokeMain(Run.scala:67)
>   at sbt.Run.run0(Run.scala:61)
>   at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
>   at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Logger$$anon$4.apply(Logger.scala:84)
>   at sbt.TrapExit$App.run(TrapExit.scala:248)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> In the above stackrace it is hard to figure out that the actual servers 
> provided as a config cannot be resolved to a valid ip address. Moreover the 
> flink kafka consumer will try all of those servers one by one and failing to 
> get partition information.
> The suggested improvement is to fail fast and announce the user that the 
> servers provided in the 'boostrap.servers' config are invalid. If at least 
> one server is valid then the exception should not be thrown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4260) Allow SQL's LIKE ESCAPE

2016-09-02 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457995#comment-15457995
 ] 

Timo Walther commented on FLINK-4260:
-

[~miaoever] Do you still plan to implement this issue? Otherwise I would give 
someone else the chance to solve this issue...

> Allow SQL's LIKE ESCAPE
> ---
>
> Key: FLINK-4260
> URL: https://issues.apache.org/jira/browse/FLINK-4260
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: miaoever
>Priority: Minor
>
> Currently, the SQL API does not support specifying an ESCAPE character in a 
> LIKE expression. The SIMILAR TO should also support that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4563) [metrics] scope caching not adjusted for multiple reporters

2016-09-02 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4563:
---

 Summary: [metrics] scope caching not adjusted for multiple 
reporters
 Key: FLINK-4563
 URL: https://issues.apache.org/jira/browse/FLINK-4563
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.1.0
Reporter: Chesnay Schepler


Every metric group contains a scope string, representing what entities 
(job/task/etc.) a given metric belongs to, which is calculated on demand. 

Before this string is cached a CharacterFilter is applied to it, which is 
provided by the callee, usually a reporter. This was done since different 
reporters have different requirements in regards to valid characters. The 
filtered string is cached so that we don't have to refilter the string every 
time.

This all works fine with a single reporter; with multiple however it is 
completely broken as only the first filter is ever applied.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77317356
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 ---
@@ -20,73 +20,125 @@
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-   
+
private final Object monitor = new Object();
-   
+
private volatile SimpleSlot slot;
-   
+
private volatile SlotAllocationFutureAction action;
-   
+
// 

 
+   /**
+* Creates a future that is uncompleted.
+*/
public SlotAllocationFuture() {}
-   
+
+   /**
+* Creates a future that is immediately completed.
+* 
+* @param slot The task slot that completes the future.
+*/
public SlotAllocationFuture(SimpleSlot slot) {
this.slot = slot;
}
-   
+
// 

-   
-   public SimpleSlot waitTillAllocated() throws InterruptedException {
-   return waitTillAllocated(0);
-   }
-   
-   public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+
+   public SimpleSlot waitTillCompleted() throws InterruptedException {
synchronized (monitor) {
while (slot == null) {
-   monitor.wait(timeout);
+   monitor.wait();
+   }
+   return slot;
+   }
+   }
+
+   public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+   checkArgument(timeout >= 0, "timeout may not be negative");
+   checkNotNull(timeUnit, "timeUnit");
+
+   if (timeout == 0) {
+   return waitTillCompleted();
+   } else {
+   final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
+   long millisToWait;
+
+   synchronized (monitor) {
+   while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
+   monitor.wait(millisToWait);
+   }
+
+   if (slot != null) {
+   return slot;
+   } else {
+   throw new TimeoutException();
+   }
}
-   
+   }
+   }
+
+   /**
+* Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
+* This method never blocks.
+* 
+* @return The slot with which this future was completed.
+* @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
+*/
+   public SimpleSlot get() {
+   final SimpleSlot slot = this.slot;
+   if (slot != null) {
return slot;
+   } else {
+   throw new IllegalStateException("The future is not 
complete - not slot available");
--- End diff --

+1 for Stephan's proposal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458026#comment-15458026
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77317356
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 ---
@@ -20,73 +20,125 @@
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-   
+
private final Object monitor = new Object();
-   
+
private volatile SimpleSlot slot;
-   
+
private volatile SlotAllocationFutureAction action;
-   
+
// 

 
+   /**
+* Creates a future that is uncompleted.
+*/
public SlotAllocationFuture() {}
-   
+
+   /**
+* Creates a future that is immediately completed.
+* 
+* @param slot The task slot that completes the future.
+*/
public SlotAllocationFuture(SimpleSlot slot) {
this.slot = slot;
}
-   
+
// 

-   
-   public SimpleSlot waitTillAllocated() throws InterruptedException {
-   return waitTillAllocated(0);
-   }
-   
-   public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+
+   public SimpleSlot waitTillCompleted() throws InterruptedException {
synchronized (monitor) {
while (slot == null) {
-   monitor.wait(timeout);
+   monitor.wait();
+   }
+   return slot;
+   }
+   }
+
+   public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+   checkArgument(timeout >= 0, "timeout may not be negative");
+   checkNotNull(timeUnit, "timeUnit");
+
+   if (timeout == 0) {
+   return waitTillCompleted();
+   } else {
+   final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
+   long millisToWait;
+
+   synchronized (monitor) {
+   while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
+   monitor.wait(millisToWait);
+   }
+
+   if (slot != null) {
+   return slot;
+   } else {
+   throw new TimeoutException();
+   }
}
-   
+   }
+   }
+
+   /**
+* Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
+* This method never blocks.
+* 
+* @return The slot with which this future was completed.
+* @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
+*/
+   public SimpleSlot get() {
+   final SimpleSlot slot = this.slot;
+   if (slot != null) {
return slot;
+   } else {
+   throw new IllegalStateException("The future is not 
complete - not slot available");
--- End diff --

+1 for Stephan's proposal


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} 

[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458030#comment-15458030
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77317670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+/**
+ * Interface for components that hold slots and to which slots get 
released / recycled.
+ */
+public interface SlotOwner {
+
+   boolean returnAllocatedSlot(Slot slot);
--- End diff --

JavaDocs are missing (even though the method is quite self-explanatory...)


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77317670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+/**
+ * Interface for components that hold slots and to which slots get 
released / recycled.
+ */
+public interface SlotOwner {
+
+   boolean returnAllocatedSlot(Slot slot);
--- End diff --

JavaDocs are missing (even though the method is quite self-explanatory...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-02 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4564:
---

 Summary: [metrics] Delimiter should be configured per reporter
 Key: FLINK-4564
 URL: https://issues.apache.org/jira/browse/FLINK-4564
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.1.0
Reporter: Chesnay Schepler


Currently, the delimiter used or the scope string is based on a configuration 
setting shared by all reporters. However, different reporters may have 
different requirements in regards to the delimiter, as such we should allow 
reporters to use a different delimiter.

We can keep the current setting as a global setting that is used if no specific 
setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4565) Support for SQL IN operator

2016-09-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4565:
---

 Summary: Support for SQL IN operator
 Key: FLINK-4565
 URL: https://issues.apache.org/jira/browse/FLINK-4565
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther


It seems that Flink SQL supports the uncorrelated sub-query IN operator. But it 
should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77318844
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
 ---
@@ -1,185 +0,0 @@
-/*
--- End diff --

Good catch :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458051#comment-15458051
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77318844
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
 ---
@@ -1,185 +0,0 @@
-/*
--- End diff --

Good catch :-)


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2447: [FLINK-4490] [distributed coordination] Decouple the JobM...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2447
  
Really good changes @StephanEwen. +1 for merging.

I had only some minor comments. 

I guess that replacing the `ActorGateways` in the `Slots` and preparing the 
`Scheduler` to work with futures, are the next follow-up tasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458061#comment-15458061
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2447
  
Really good changes @StephanEwen. +1 for merging.

I had only some minor comments. 

I guess that replacing the `ActorGateways` in the `Slots` and preparing the 
`Scheduler` to work with futures, are the next follow-up tasks.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-02 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77320094
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
 ---
@@ -99,7 +110,43 @@ public void channelRead0(ChannelHandlerContext ctx, 
HttpObject msg) {
currentDecoder.destroy();
currentDecoder = null;
}
-   
+
+   if(secureCookie != null) {
--- End diff --

The YARN code does the auto-populaton only if security is enabled, so it 
should not be an issue.

> user-provided but will be persisted in the in-memory Flink configuration 
instance which is passed to the web layer during bootstrap.

The Configuration object in the CliFrontend is not transferred to the 
ApplicationMaster / JobMaster.
The flink-config.yaml (and on YARN the "dynamic properties") are the only 
sources of the configuration for the AM/JM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-02 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77320247
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -426,4 +440,11 @@ void unregisterConnection(BlobServerConnection conn) {
}
}
 
+   /* Secure cookie to authenticate */
+   @Override
+   public String getSecureCookie() { return secureCookie; }
+
+   /* Flag to indicate if service level authentication is enabled or not */
+   public boolean isSecurityEnabled() { return secureCookie != null; }
--- End diff --

Okay, I see. In the constructor you are checking that the secureCookie is 
set if security is enabled.

Then its okay to leave it as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458071#comment-15458071
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77320094
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
 ---
@@ -99,7 +110,43 @@ public void channelRead0(ChannelHandlerContext ctx, 
HttpObject msg) {
currentDecoder.destroy();
currentDecoder = null;
}
-   
+
+   if(secureCookie != null) {
--- End diff --

The YARN code does the auto-populaton only if security is enabled, so it 
should not be an issue.

> user-provided but will be persisted in the in-memory Flink configuration 
instance which is passed to the web layer during bootstrap.

The Configuration object in the CliFrontend is not transferred to the 
ApplicationMaster / JobMaster.
The flink-config.yaml (and on YARN the "dynamic properties") are the only 
sources of the configuration for the AM/JM.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458073#comment-15458073
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77320247
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -426,4 +440,11 @@ void unregisterConnection(BlobServerConnection conn) {
}
}
 
+   /* Secure cookie to authenticate */
+   @Override
+   public String getSecureCookie() { return secureCookie; }
+
+   /* Flag to indicate if service level authentication is enabled or not */
+   public boolean isSecurityEnabled() { return secureCookie != null; }
--- End diff --

Okay, I see. In the constructor you are checking that the secureCookie is 
set if security is enabled.

Then its okay to leave it as is.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-02 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77320960
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -682,6 +774,91 @@ public static File 
getYarnPropertiesLocation(Configuration conf) {
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
}
 
+   public static void persistAppState(String appId, String cookie) {
+   if(appId == null || cookie == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to persist cookie for the appID: {} in {} ", 
appId, path);
+   try {
+   File f = new File(path);
+   if(!f.exists()) {
+   f.createNewFile();
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (subNode.containsKey(cookieKey)) {
+   String errorMessage = "Secure Cookie is already 
found in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   subNode.addProperty(cookieKey, cookie);
+   config.save();
+   LOG.debug("Persisted cookie for the appID: {}", appId);
+   } catch(Exception e) {
+   LOG.error("Exception occurred while persisting app 
state for app id: {}. Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+   }
+
+   public static String getAppSecureCookie(String appId) {
+   if(appId == null) {
+   String errorMessage = "Application ID cannot be null";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+
+   String cookieFromFile;
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to fetch cookie for the appID: {} from {}", 
appId, path);
+
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (!subNode.containsKey(cookieKey)) {
+   String errorMessage = "Could  not find the app 
ID section in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   cookieFromFile = subNode.getString(cookieKey, "");
+   if(cookieFromFile.length() == 0) {
+   String errorMessage = "Could  not find cookie 
in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   } catch(Exception e) {
+   LOG.error("Exception occurred while fetching cookie for 
app id: {} Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+
+   LOG.debug("Found cookie for the appID: {}", appId);
+   return cookieFromFile;
+   }
+
+   public static void removeAppState(String appId) {
+   if(appId == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to remove the reference for the appId: {} from 
{}", appId, path);
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.warn(errorMessage);
+   return;
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   config.clearTree(appId);
+   config.save();
+

[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458082#comment-15458082
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77320960
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -682,6 +774,91 @@ public static File 
getYarnPropertiesLocation(Configuration conf) {
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
}
 
+   public static void persistAppState(String appId, String cookie) {
+   if(appId == null || cookie == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to persist cookie for the appID: {} in {} ", 
appId, path);
+   try {
+   File f = new File(path);
+   if(!f.exists()) {
+   f.createNewFile();
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (subNode.containsKey(cookieKey)) {
+   String errorMessage = "Secure Cookie is already 
found in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   subNode.addProperty(cookieKey, cookie);
+   config.save();
+   LOG.debug("Persisted cookie for the appID: {}", appId);
+   } catch(Exception e) {
+   LOG.error("Exception occurred while persisting app 
state for app id: {}. Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+   }
+
+   public static String getAppSecureCookie(String appId) {
+   if(appId == null) {
+   String errorMessage = "Application ID cannot be null";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+
+   String cookieFromFile;
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to fetch cookie for the appID: {} from {}", 
appId, path);
+
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (!subNode.containsKey(cookieKey)) {
+   String errorMessage = "Could  not find the app 
ID section in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   cookieFromFile = subNode.getString(cookieKey, "");
+   if(cookieFromFile.length() == 0) {
+   String errorMessage = "Could  not find cookie 
in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   } catch(Exception e) {
+   LOG.error("Exception occurred while fetching cookie for 
app id: {} Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+
+   LOG.debug("Found cookie for the appID: {}", appId);
+   return cookieFromFile;
+   }
+
+   public static void removeAppState(String appId) {
+   if(appId == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to remove the reference for the appId: {} from 
{}", appId, path);
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.warn(errorMessage);
+ 

[jira] [Created] (FLINK-4566) ProducerFailedException does not properly preserve Exception causes

2016-09-02 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4566:
---

 Summary: ProducerFailedException does not properly preserve 
Exception causes
 Key: FLINK-4566
 URL: https://issues.apache.org/jira/browse/FLINK-4566
 Project: Flink
  Issue Type: Bug
  Components: Network
Affects Versions: 1.1.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.2.0, 1.1.3


To avoid class loading issues with user-scope exceptions, the 
{{ProducerFailedException}} does not properly store its cause, but only a 
stringified variant of it.

We should use the {{SerializedThrowable}} instead of manual cause 
stringification.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4568) Replace ActorGateway in WebRuntimeMonitor

2016-09-02 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4568:


 Summary: Replace ActorGateway in WebRuntimeMonitor
 Key: FLINK-4568
 URL: https://issues.apache.org/jira/browse/FLINK-4568
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: Till Rohrmann


Currently the {{WebRuntimeMonitor}} uses the {{JobManagerRetriever}} which 
returns a {{ActorGateway}} to the {{JobManager}}. This should be replaced by an 
interface to avoid merge conflicts with the Flip-6 refactorings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4567) Enhance SerializedThrowable to properly preserver cause chains

2016-09-02 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4567:
---

 Summary: Enhance SerializedThrowable to properly preserver cause 
chains
 Key: FLINK-4567
 URL: https://issues.apache.org/jira/browse/FLINK-4567
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.1.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.2.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-02 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-4348:
-

Assignee: Maximilian Michels  (was: zhangjing)

> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

2016-09-02 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2449
  
Looks good to me, +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458101#comment-15458101
 ] 

ASF GitHub Bot commented on FLINK-4455:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2449
  
Looks good to me, +1


> Replace ActorGateways in NetworkEnvironment by interfaces
> -
>
> Key: FLINK-4455
> URL: https://issues.apache.org/jira/browse/FLINK-4455
> Project: Flink
>  Issue Type: Improvement
>  Components: Network, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{NetworkEnvironment}} communicates with the outside world 
> ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the 
> dependency on actors. 
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement 
> these interfaces as part of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2447: [FLINK-4490] [distributed coordination] Decouple the JobM...

2016-09-02 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2447
  
We can do either that follow-up, or we can add an RPC gateway in the 
`flip-6` branch and remove the ActorGateway after merging the feature and 
master branch.

Will address the comments and merge...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

2016-09-02 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2461

[FLINK-4505][Cluster Management] Implement TaskManagerFactory to brin…

Implement TaskExecutorFactory that should be an abstract class with the 
helper methods to bring up the TaskManager. The factory can be implemented by 
some classes to start a TaskManager in different modes (testing, standalone, 
yarn).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4505

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2461


commit 21c4b408bb0138b41e49a439f6d07f9a0ca9
Author: 淘江 
Date:   2016-09-02T10:00:49Z

[FLINK-4505][Cluster Management] Implement TaskManagerFactory to bring up 
TaskManager for different modes

Summary: above

Test Plan: NA

Reviewers: kete.yangkt

Subscribers: #blink

Differential Revision: http://phabricator.taobao.net/D5606




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2449
  
Tests have passed locally. Will rebase check again on Travis and then merge 
the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458119#comment-15458119
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2447
  
We can do either that follow-up, or we can add an RPC gateway in the 
`flip-6` branch and remove the ActorGateway after merging the feature and 
master branch.

Will address the comments and merge...


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458121#comment-15458121
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2461

[FLINK-4505][Cluster Management] Implement TaskManagerFactory to brin…

Implement TaskExecutorFactory that should be an abstract class with the 
helper methods to bring up the TaskManager. The factory can be implemented by 
some classes to start a TaskManager in different modes (testing, standalone, 
yarn).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4505

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2461


commit 21c4b408bb0138b41e49a439f6d07f9a0ca9
Author: 淘江 
Date:   2016-09-02T10:00:49Z

[FLINK-4505][Cluster Management] Implement TaskManagerFactory to bring up 
TaskManager for different modes

Summary: above

Test Plan: NA

Reviewers: kete.yangkt

Subscribers: #blink

Differential Revision: http://phabricator.taobao.net/D5606




> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458118#comment-15458118
 ] 

ASF GitHub Bot commented on FLINK-4455:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2449
  
Tests have passed locally. Will rebase check again on Travis and then merge 
the PR.


> Replace ActorGateways in NetworkEnvironment by interfaces
> -
>
> Key: FLINK-4455
> URL: https://issues.apache.org/jira/browse/FLINK-4455
> Project: Flink
>  Issue Type: Improvement
>  Components: Network, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{NetworkEnvironment}} communicates with the outside world 
> ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the 
> dependency on actors. 
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement 
> these interfaces as part of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
Will rebase on the latest master and #2449 to run the travis tests again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458128#comment-15458128
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
Will rebase on the latest master and #2449 to run the travis tests again.


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4569) JobRetrievalITCase.testJobRetrieval() does not forward exceptions to parent thread.

2016-09-02 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4569:
-

 Summary: JobRetrievalITCase.testJobRetrieval() does not forward 
exceptions to parent thread.
 Key: FLINK-4569
 URL: https://issues.apache.org/jira/browse/FLINK-4569
 Project: Flink
  Issue Type: Bug
  Components: Client
Reporter: Robert Metzger


The mentioned test seems to fail frequently, without being detected, because 
the Assert.fail() is called in a separate thread which doesn't forward 
exceptions.
https://s3.amazonaws.com/archive.travis-ci.org/jobs/156177995/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-09-02 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
Running some last tests before merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458147#comment-15458147
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
Running some last tests before merging.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2418: [FLINK-4245] JMXReporter exposes all defined variables

2016-09-02 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2418
  
I've updated the PR.
* reverted back to creating a new Hashtable and filtering all keys and 
values
* introduced the notion of a "logical scope"; in other words the group 
hierarchy without specific information like ID's and such. Example: 
"taskmanager.job.task.usergroup". The order is strictly the group hierarchy and 
unaffected by the scope formats.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4245) Metric naming improvements

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458151#comment-15458151
 ] 

ASF GitHub Bot commented on FLINK-4245:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2418
  
I've updated the PR.
* reverted back to creating a new Hashtable and filtering all keys and 
values
* introduced the notion of a "logical scope"; in other words the group 
hierarchy without specific information like ID's and such. Example: 
"taskmanager.job.task.usergroup". The order is strictly the group hierarchy and 
unaffected by the scope formats.


> Metric naming improvements
> --
>
> Key: FLINK-4245
> URL: https://issues.apache.org/jira/browse/FLINK-4245
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Stephan Ewen
>
> A metric currently has two parts to it:
>   - The name of that particular metric
>   - The "scope" (or namespace), defined by the group that contains the metric.
> A metric group actually always implicitly has a map of naming "tags", like:
>   - taskmanager_host : 
>   - taskmanager_id : 
>   - task_name : "map() -> filter()"
> We derive the scope from that map, following the defined scope formats.
> For JMX (and some users that use JMX), it would be natural to expose that map 
> of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
> can expose a metric like:
>   - domain: "taskmanager.task.operator.io"
>   - name: "numRecordsIn"
>   - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
> X.java:123", ... }
> For many other reporters, the formatted scope makes a lot of sense, since 
> they think only in terms of (scope, metric-name).
> We may even have the formatted scope in JMX as well (in the domain), if we 
> want to go that route. 
> [~jgrier] and [~Zentol] - what do you think about that?
> [~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2312) Random Splits

2016-09-02 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458159#comment-15458159
 ] 

Simone Robutti commented on FLINK-2312:
---

Should we reconsider this implementation?

> Random Splits
> -
>
> Key: FLINK-2312
> URL: https://issues.apache.org/jira/browse/FLINK-2312
> Project: Flink
>  Issue Type: Wish
>  Components: Machine Learning Library
>Reporter: Maximilian Alber
>Assignee: pietro pinoli
>Priority: Minor
>
> In machine learning applications it is common to split data sets into f.e. 
> training and testing set.
> To the best of my knowledge there is at the moment no nice way in Flink to 
> split a data set randomly into several partitions according to some ratio.
> The wished semantic would be the same as of Sparks RDD randomSplit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4547) Return same object when call connect method in AkkaRpcService using same address and same rpc gateway class

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458168#comment-15458168
 ] 

ASF GitHub Bot commented on FLINK-4547:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2455#discussion_r77326124
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -189,7 +191,49 @@ public void stop() {
rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
}
 
-   // 

+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o == null) {
+   return false;
+   }
+
+   if(Proxy.isProxyClass(o.getClass())) {
+   return o.equals(this);
+   }
--- End diff --

Why should the `AkkaInvocationHandler` be a proxy class?


> Return same object when call connect method in AkkaRpcService using same 
> address and same rpc gateway class
> ---
>
> Key: FLINK-4547
> URL: https://issues.apache.org/jira/browse/FLINK-4547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: zhangjing
>Assignee: zhangjing
>
> Now every time call connect method in AkkaRpcService class using same address 
> and same rpc gateway class, the return gateway object is totally different 
> with each other which equals and hashcode are not same. 
> Maybe it’s reasonable to have the same result (equals return true, and 
> hashcode is same) when using the same address and same Gateway class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2455#discussion_r77326124
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -189,7 +191,49 @@ public void stop() {
rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
}
 
-   // 

+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o == null) {
+   return false;
+   }
+
+   if(Proxy.isProxyClass(o.getClass())) {
+   return o.equals(this);
+   }
--- End diff --

Why should the `AkkaInvocationHandler` be a proxy class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2455: [FLINK-4547] [cluster management] Return same object when...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2455
  
I think the subject line of this PR is a little bit misleading. The 
`RpcService` won't return the same gateway object upon calling `connect` with 
the same parameters. But the returned gateways are equal with respect to 
`equals` and `hashCode`.

Maybe the JIRA and the PR subject line should be corrected to better 
reflect the actual changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4547) Return same object when call connect method in AkkaRpcService using same address and same rpc gateway class

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458179#comment-15458179
 ] 

ASF GitHub Bot commented on FLINK-4547:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2455
  
I think the subject line of this PR is a little bit misleading. The 
`RpcService` won't return the same gateway object upon calling `connect` with 
the same parameters. But the returned gateways are equal with respect to 
`equals` and `hashCode`.

Maybe the JIRA and the PR subject line should be corrected to better 
reflect the actual changes.


> Return same object when call connect method in AkkaRpcService using same 
> address and same rpc gateway class
> ---
>
> Key: FLINK-4547
> URL: https://issues.apache.org/jira/browse/FLINK-4547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: zhangjing
>Assignee: zhangjing
>
> Now every time call connect method in AkkaRpcService class using same address 
> and same rpc gateway class, the return gateway object is totally different 
> with each other which equals and hashcode are not same. 
> Maybe it’s reasonable to have the same result (equals return true, and 
> hashcode is same) when using the same address and same Gateway class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2455#discussion_r77326942
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 ---
@@ -84,4 +89,35 @@ public void run() {
 
assertTrue("call was not properly delayed", ((stop - start) / 
100) >= delay);
}
+
+   /**
+* Test connect method
+* 1. Get the same result when connect the same address and same 
gateway class
+* 2. Failed when connect to invalid address
+* @throws Exception
+*/
+   @Test
+   public void testConnect() throws Exception {
+   TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+   TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+   
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+   ResourceManager rm = new ResourceManager(akkaRpcService, 
highAvailabilityServices);
+   rm.start();
+   String address = rm.getAddress();
+   // verify get the same result when connect the same address and 
same gateway class
+   Future rmGatewayFuture1 = 
akkaRpcService.connect(address, ResourceManagerGateway.class);
+   ResourceManagerGateway rmGateway1 = 
Await.result(rmGatewayFuture1, new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+   Future rmGatewayFuture2 = 
akkaRpcService.connect(address, ResourceManagerGateway.class);
+   ResourceManagerGateway rmGateway2 = 
Await.result(rmGatewayFuture2, new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+   Assert.assertEquals(rmGateway1, rmGateway2);
+   Assert.assertEquals(rmGateway1.hashCode(), 
rmGateway2.hashCode());
+
+   // verify failed when connect to invalid address
+   String invalidString = "abc";
+   Future invalidRmGatewayFuture = 
akkaRpcService.connect(invalidString, ResourceManagerGateway.class);
+   assertTrue(invalidRmGatewayFuture.failed().value().get().get() 
instanceof RuntimeException);
--- End diff --

Maybe you should check whether the future is completed. You could obtain 
the result via `Await.result(future, timeout)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4547) Return same object when call connect method in AkkaRpcService using same address and same rpc gateway class

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458182#comment-15458182
 ] 

ASF GitHub Bot commented on FLINK-4547:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2455#discussion_r77326942
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 ---
@@ -84,4 +89,35 @@ public void run() {
 
assertTrue("call was not properly delayed", ((stop - start) / 
100) >= delay);
}
+
+   /**
+* Test connect method
+* 1. Get the same result when connect the same address and same 
gateway class
+* 2. Failed when connect to invalid address
+* @throws Exception
+*/
+   @Test
+   public void testConnect() throws Exception {
+   TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+   TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+   
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+   ResourceManager rm = new ResourceManager(akkaRpcService, 
highAvailabilityServices);
+   rm.start();
+   String address = rm.getAddress();
+   // verify get the same result when connect the same address and 
same gateway class
+   Future rmGatewayFuture1 = 
akkaRpcService.connect(address, ResourceManagerGateway.class);
+   ResourceManagerGateway rmGateway1 = 
Await.result(rmGatewayFuture1, new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+   Future rmGatewayFuture2 = 
akkaRpcService.connect(address, ResourceManagerGateway.class);
+   ResourceManagerGateway rmGateway2 = 
Await.result(rmGatewayFuture2, new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+   Assert.assertEquals(rmGateway1, rmGateway2);
+   Assert.assertEquals(rmGateway1.hashCode(), 
rmGateway2.hashCode());
+
+   // verify failed when connect to invalid address
+   String invalidString = "abc";
+   Future invalidRmGatewayFuture = 
akkaRpcService.connect(invalidString, ResourceManagerGateway.class);
+   assertTrue(invalidRmGatewayFuture.failed().value().get().get() 
instanceof RuntimeException);
--- End diff --

Maybe you should check whether the future is completed. You could obtain 
the result via `Await.result(future, timeout)`.


> Return same object when call connect method in AkkaRpcService using same 
> address and same rpc gateway class
> ---
>
> Key: FLINK-4547
> URL: https://issues.apache.org/jira/browse/FLINK-4547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: zhangjing
>Assignee: zhangjing
>
> Now every time call connect method in AkkaRpcService class using same address 
> and same rpc gateway class, the return gateway object is totally different 
> with each other which equals and hashcode are not same. 
> Maybe it’s reasonable to have the same result (equals return true, and 
> hashcode is same) when using the same address and same Gateway class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2312) Random Splits

2016-09-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458190#comment-15458190
 ] 

Till Rohrmann commented on FLINK-2312:
--

We've merged FLINK-2259 which might already fulfil your needs wrt random 
splitting.

> Random Splits
> -
>
> Key: FLINK-2312
> URL: https://issues.apache.org/jira/browse/FLINK-2312
> Project: Flink
>  Issue Type: Wish
>  Components: Machine Learning Library
>Reporter: Maximilian Alber
>Assignee: pietro pinoli
>Priority: Minor
>
> In machine learning applications it is common to split data sets into f.e. 
> training and testing set.
> To the best of my knowledge there is at the moment no nice way in Flink to 
> split a data set randomly into several partitions according to some ratio.
> The wished semantic would be the same as of Sparks RDD randomSplit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2461#discussion_r77327741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
 ---
@@ -0,0 +1,198 @@
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.NetUtils;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An factory for creating {@link TaskExecutor} and starting it in yarn 
mode.
+ */
+public class YarnTaskExecutorFactory extends TaskExecutorFactory {
+
+   public YarnTaskExecutorFactory(Configuration configuration, ResourceID 
resourceID) {
+   super(configuration, resourceID);
+   }
+
+   @Override
+   public TaskExecutor createAndStartTaskExecutor() throws Exception {
+   return selectNetworkInterfaceAndRunTaskManager(configuration, 
resourceID);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
--- End diff --

Why does only the Yarn factory selects the network interface and not the 
standalone implementation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458199#comment-15458199
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2461#discussion_r77327741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
 ---
@@ -0,0 +1,198 @@
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.NetUtils;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An factory for creating {@link TaskExecutor} and starting it in yarn 
mode.
+ */
+public class YarnTaskExecutorFactory extends TaskExecutorFactory {
+
+   public YarnTaskExecutorFactory(Configuration configuration, ResourceID 
resourceID) {
+   super(configuration, resourceID);
+   }
+
+   @Override
+   public TaskExecutor createAndStartTaskExecutor() throws Exception {
+   return selectNetworkInterfaceAndRunTaskManager(configuration, 
resourceID);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
--- End diff --

Why does only the Yarn factory selects the network interface and not the 
standalone implementation?


> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-02 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@mxm ,  The {{StandaloneTaskExecutorFactory}} can be used for mini cluster 
or testing mode I think, and the {{YarnTaskExecutorFactory}} used for yarn 
mode. After you confirm the implementation, I can add some testings for the 
factory if needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458203#comment-15458203
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@mxm ,  The {{StandaloneTaskExecutorFactory}} can be used for mini cluster 
or testing mode I think, and the {{YarnTaskExecutorFactory}} used for yarn 
mode. After you confirm the implementation, I can add some testings for the 
factory if needed.


> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

2016-09-02 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2461#discussion_r77329068
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
 ---
@@ -0,0 +1,198 @@
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.NetUtils;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An factory for creating {@link TaskExecutor} and starting it in yarn 
mode.
+ */
+public class YarnTaskExecutorFactory extends TaskExecutorFactory {
+
+   public YarnTaskExecutorFactory(Configuration configuration, ResourceID 
resourceID) {
+   super(configuration, resourceID);
+   }
+
+   @Override
+   public TaskExecutor createAndStartTaskExecutor() throws Exception {
+   return selectNetworkInterfaceAndRunTaskManager(configuration, 
resourceID);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
--- End diff --

The parameters provided from different modes are different as I referred to 
previous implementation for **TaskManager**. For example, 
**YarnTaskManagerRunner** invokes the method 
"selectNetworkInterfaceAndRunTaskManager", and **ForkableFlinkMiniCluster** 
invokes the method "startTaskManagerComponentsAndActor" to start 
**TaskManager** before. So I retained the previous ways for different modes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458217#comment-15458217
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2461#discussion_r77329068
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
 ---
@@ -0,0 +1,198 @@
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.NetUtils;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An factory for creating {@link TaskExecutor} and starting it in yarn 
mode.
+ */
+public class YarnTaskExecutorFactory extends TaskExecutorFactory {
+
+   public YarnTaskExecutorFactory(Configuration configuration, ResourceID 
resourceID) {
+   super(configuration, resourceID);
+   }
+
+   @Override
+   public TaskExecutor createAndStartTaskExecutor() throws Exception {
+   return selectNetworkInterfaceAndRunTaskManager(configuration, 
resourceID);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
--- End diff --

The parameters provided from different modes are different as I referred to 
previous implementation for **TaskManager**. For example, 
**YarnTaskManagerRunner** invokes the method 
"selectNetworkInterfaceAndRunTaskManager", and **ForkableFlinkMiniCluster** 
invokes the method "startTaskManagerComponentsAndActor" to start 
**TaskManager** before. So I retained the previous ways for different modes.


> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
Thanks for the contribution @wangzhijiang999.

I think the abstraction is not right. The `StandaloneFactory` and the 
`YarnFactory` should actually only differ in the class of the `TaskExecutor` 
they start. Furthermore, I think that the network interface selection should 
not be part of the factory. Instead I would pass the hostname and port to the 
`createTaskManager(hostname, port)` method which constructs the `TaskManager`. 
Maybe it makes even sense to pass the `RpcService` via the 
`createTaskManager(RpcService)` and defer the RpcService creation to the 
outside method running the select interface method. 

The difference between the Standalone/Yarn and Testing factory is that the 
first two create the TM components whereas the `TestingFactory` is initialized 
with the `TaskManager` components.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458228#comment-15458228
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
Thanks for the contribution @wangzhijiang999.

I think the abstraction is not right. The `StandaloneFactory` and the 
`YarnFactory` should actually only differ in the class of the `TaskExecutor` 
they start. Furthermore, I think that the network interface selection should 
not be part of the factory. Instead I would pass the hostname and port to the 
`createTaskManager(hostname, port)` method which constructs the `TaskManager`. 
Maybe it makes even sense to pass the `RpcService` via the 
`createTaskManager(RpcService)` and defer the RpcService creation to the 
outside method running the select interface method. 

The difference between the Standalone/Yarn and Testing factory is that the 
first two create the TM components whereas the `TestingFactory` is initialized 
with the `TaskManager` components.


> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-02 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@tillrohrmann , the current implementation is just follow the previous 
process for yarn and standalone modes. And I agree your opinion and actually it 
is not very clear for current ways. I think first we should re-define and 
confirm the parameters passed to the "createTaskManager" method for different 
factories, then I can fix the inner processes based on the input parameters. 
Would you suggest the specific parameters passed for different factories?  
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458249#comment-15458249
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@tillrohrmann , the current implementation is just follow the previous 
process for yarn and standalone modes. And I agree your opinion and actually it 
is not very clear for current ways. I think first we should re-define and 
confirm the parameters passed to the "createTaskManager" method for different 
factories, then I can fix the inner processes based on the input parameters. 
Would you suggest the specific parameters passed for different factories?  
Thank you!


> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
I think we don't need the distinction between standalone and yarn in the 
TaskManager startup. They should actually be the same. The difference will be 
the `StandaloneRunner`, `YarnRunner` and `MesosRunner`.

I think the factory should be responsible for creating the `TaskManager` 
components. I would pull out the `RpcService` creation and, thus, also the 
network interface selection, because this is code which can be re-used across 
different components (TM and JM, for example).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458340#comment-15458340
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
I think we don't need the distinction between standalone and yarn in the 
TaskManager startup. They should actually be the same. The difference will be 
the `StandaloneRunner`, `YarnRunner` and `MesosRunner`.

I think the factory should be responsible for creating the `TaskManager` 
components. I would pull out the `RpcService` creation and, thus, also the 
network interface selection, because this is code which can be re-used across 
different components (TM and JM, for example).


> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77335889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.UUID;
+
+/**
+ * An exception specifying that received leader session ID is not as same 
as expected.
--- End diff --

"that the received", "is not the same as"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77335925
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.UUID;
+
+/**
+ * An exception specifying that received leader session ID is not as same 
as expected.
+ */
+public class UnmatchedLeaderSessionIDException extends Exception {
--- End diff --

I think it's ok to name this exception `LeaderSessionIDException`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77335961
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.UUID;
+
+/**
+ * An exception specifying that received leader session ID is not as same 
as expected.
+ */
+public class UnmatchedLeaderSessionIDException extends Exception {
+
+   private static final long serialVersionUID = -3276145308053264636L;
+
+   /** expected leader session id */
+   private final UUID expectedLeaderSessionID;
+
+   /** actual leader session id */
+   private final UUID actualLeaderSessionID;
+
+   public UnmatchedLeaderSessionIDException(UUID expectedLeaderSessionID, 
UUID actualLeaderSessionID) {
+   super("Unmatched leader session ID : expected " + 
expectedLeaderSessionID + ", actual " + actualLeaderSessionID);
+   this.expectedLeaderSessionID = expectedLeaderSessionID;
--- End diff --

`checkNotNull`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4535) ResourceManager registration with TaskExecutor

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458347#comment-15458347
 ] 

ASF GitHub Bot commented on FLINK-4535:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77335889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.UUID;
+
+/**
+ * An exception specifying that received leader session ID is not as same 
as expected.
--- End diff --

"that the received", "is not the same as"


> ResourceManager registration with TaskExecutor
> --
>
> Key: FLINK-4535
> URL: https://issues.apache.org/jira/browse/FLINK-4535
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
>
> When TaskExecutor register at ResourceManager, it takes the following 3 input 
> parameters:
> 1. resourceManagerLeaderId:  the fencing token for the ResourceManager leader 
> which is kept by taskExecutor who send the registration
> 2.  taskExecutorAddress: the address of taskExecutor
> 3. resourceID: The resource ID of the TaskExecutor that registers
> ResourceManager need to process the registration event based on the following 
> steps:
> 1. Check whether input resourceManagerLeaderId is as same as the current 
> leadershipSessionId of resourceManager. If not, it means that maybe two or 
> more resourceManager exists at the same time, and current resourceManager is 
> not the proper rm. so it  rejects or ignores the registration.
> 2. Check whether exists a valid taskExecutor at the giving address by 
> connecting to the address. Reject the registration from invalid address.
> 3. Check whether it is a duplicate registration by input resourceId, reject 
> the registration
> 4. Keep resourceID and taskExecutorGateway mapping relationships, And 
> optionally keep resourceID and container mapping relationships in yarn mode.
> 5. Create the connection between resourceManager and taskExecutor, and ensure 
> its healthy based on heartbeat rpc calls between rm and tm ?
> 6. Send registration successful ack to the taskExecutor.
> Discussion:
> Maybe we need import errorCode or several registration decline subclass to 
> distinguish the different causes of decline registration. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4535) ResourceManager registration with TaskExecutor

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458350#comment-15458350
 ] 

ASF GitHub Bot commented on FLINK-4535:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77335961
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.UUID;
+
+/**
+ * An exception specifying that received leader session ID is not as same 
as expected.
+ */
+public class UnmatchedLeaderSessionIDException extends Exception {
+
+   private static final long serialVersionUID = -3276145308053264636L;
+
+   /** expected leader session id */
+   private final UUID expectedLeaderSessionID;
+
+   /** actual leader session id */
+   private final UUID actualLeaderSessionID;
+
+   public UnmatchedLeaderSessionIDException(UUID expectedLeaderSessionID, 
UUID actualLeaderSessionID) {
+   super("Unmatched leader session ID : expected " + 
expectedLeaderSessionID + ", actual " + actualLeaderSessionID);
+   this.expectedLeaderSessionID = expectedLeaderSessionID;
--- End diff --

`checkNotNull`


> ResourceManager registration with TaskExecutor
> --
>
> Key: FLINK-4535
> URL: https://issues.apache.org/jira/browse/FLINK-4535
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
>
> When TaskExecutor register at ResourceManager, it takes the following 3 input 
> parameters:
> 1. resourceManagerLeaderId:  the fencing token for the ResourceManager leader 
> which is kept by taskExecutor who send the registration
> 2.  taskExecutorAddress: the address of taskExecutor
> 3. resourceID: The resource ID of the TaskExecutor that registers
> ResourceManager need to process the registration event based on the following 
> steps:
> 1. Check whether input resourceManagerLeaderId is as same as the current 
> leadershipSessionId of resourceManager. If not, it means that maybe two or 
> more resourceManager exists at the same time, and current resourceManager is 
> not the proper rm. so it  rejects or ignores the registration.
> 2. Check whether exists a valid taskExecutor at the giving address by 
> connecting to the address. Reject the registration from invalid address.
> 3. Check whether it is a duplicate registration by input resourceId, reject 
> the registration
> 4. Keep resourceID and taskExecutorGateway mapping relationships, And 
> optionally keep resourceID and container mapping relationships in yarn mode.
> 5. Create the connection between resourceManager and taskExecutor, and ensure 
> its healthy based on heartbeat rpc calls between rm and tm ?
> 6. Send registration successful ack to the taskExecutor.
> Discussion:
> Maybe we need import errorCode or several registration decline subclass to 
> distinguish the different causes of decline registration. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >