[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422341#comment-15422341
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74886606
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;

[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-16 Thread delding
Github user delding commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r74886606
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  This class represents a list of {@link MutationAction}s you will take 
when writing
+ *  an input value of {@link HBaseSink} to a row in a HBase table.
+ *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
+ *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
+ */
+public class MutationActionList {
+   private final List actions;
+
+   public MutationActionList() {
+   this.actions = new ArrayList<>();
+   }
+
+   public List getActions() {
+   return this.actions;
+   }
+
+   /**
+* Create a new list of HBase {@link Mutation}s.
+*
+* @param rowKey row that the created {@link Mutation} list is applied 
to
+* @param writeToWAL enable WAL
+* @return a list of HBase {@link Mutation}s
+*/
+   public List newMutationList(byte[] rowKey, boolean 
writeToWAL) {
+   List mutations = new ArrayList<>();
+   Put put = null;
+   Increment increment = null;
+   Append append = null;
+   Delete delete = null;
+   boolean rowIsDeleted = false;
+   for (MutationAction action : actions) {
+   switch (action.getType()) {
+   case PUT:
+   if (put == null) {
+   put = new Put(rowKey);
+   mutations.add(put);
+   }
+   if (action.getTs() == -1) {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+   } else {
+   
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
+   }
+   break;
+
+   case INCREMENT:
+   if (increment == null) {
+   increment = new 
Increment(rowKey);
+   mutations.add(increment);
+   }
+   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
+   break;
+
+   case APPEND:
+   if (append == null) {
+   append = new Append(rowKey);
+   mutations.add(append);
+   }
+   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
+   break;
+
+   // If there are multiple DELETE_ROW actions, 
only the first one is served
+   case DELETE_ROW:
+   if (!rowIsDeleted) {
+

[jira] [Commented] (FLINK-4393) Failed to serialize accumulators for task

2016-08-16 Thread Sajeev Ramakrishnan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422352#comment-15422352
 ] 

Sajeev Ramakrishnan commented on FLINK-4393:


My processing file size is 3.6 GB which contains 20 million lines. This issue 
was not coming when I used flatMap.

> Failed to serialize accumulators for task
> -
>
> Key: FLINK-4393
> URL: https://issues.apache.org/jira/browse/FLINK-4393
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
> Environment: Redhat 6
>Reporter: Sajeev Ramakrishnan
>
> Dear Team,
>   I am getting the below exception while trying to use the Table API by 
> looping through the DataSet using collect() method.
> {code}
> 2016-08-15 07:18:52,503 WARN  
> org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to 
> serialize accumulators for task.
> java.lang.OutOfMemoryError
> at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1248)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:446)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:292)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Suppressed: java.lang.OutOfMemoryError
> at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at 
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at 
> java.io.ByteArrayOutputStream.wr

[GitHub] flink issue #2374: [FLINK-3950] Add Meter Metric Type

2016-08-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2374
  
I will look at this in more detail later today; but i couldn't find any 
mistakes skimming over it. However, a small update to the Metrics documentation 
would be neat (which i forgot to list on my to-do-list :( ).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422355#comment-15422355
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2374
  
I will look at this in more detail later today; but i couldn't find any 
mistakes skimming over it. However, a small update to the Metrics documentation 
would be neat (which i forgot to list on my to-do-list :( ).


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4393) Failed to serialize accumulators for task

2016-08-16 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422367#comment-15422367
 ] 

Robert Metzger commented on FLINK-4393:
---

3.6 GB are too much data to transfer using collect().
I think you have to work around this limitation. You could for example write 
the data into a file and, and read the file from the client.

> Failed to serialize accumulators for task
> -
>
> Key: FLINK-4393
> URL: https://issues.apache.org/jira/browse/FLINK-4393
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
> Environment: Redhat 6
>Reporter: Sajeev Ramakrishnan
>
> Dear Team,
>   I am getting the below exception while trying to use the Table API by 
> looping through the DataSet using collect() method.
> {code}
> 2016-08-15 07:18:52,503 WARN  
> org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to 
> serialize accumulators for task.
> java.lang.OutOfMemoryError
> at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1248)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:446)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:292)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Suppressed: java.lang.OutOfMemoryError
> at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at 
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStre

[GitHub] flink issue #2370: [FLINK-4373] [cluster management] Introduce SlotID, Alloc...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2370
  
Failing test cases are unrelated. Will merge this PR. Thanks for your 
contribution @KurtYoung.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4373) Introduce SlotID, AllocationID, ResourceProfile

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422383#comment-15422383
 ] 

ASF GitHub Bot commented on FLINK-4373:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2370
  
Failing test cases are unrelated. Will merge this PR. Thanks for your 
contribution @KurtYoung.


> Introduce SlotID, AllocationID, ResourceProfile
> ---
>
> Key: FLINK-4373
> URL: https://issues.apache.org/jira/browse/FLINK-4373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For the new version of cluster management, we need some more basic data 
> structures:
> * SlotID: Identifier of one single slot located in a task executor
> * AllocationID: Slot allocation identifier, created by the JobManager when 
> requesting a slot, constant across re-tries. Used to identify responses by 
> the ResourceManager and to identify deployment calls towards the TaskManager 
> that was allocated from.
> * ResourceProfile: The resource profile of the desired slot (currently only 
> cpu cores and memory are supported



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4373) Introduce SlotID, AllocationID, ResourceProfile

2016-08-16 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4373.
--
Resolution: Done

Added via 08cf860169948c93c6c92ab1e2c70cf1a4266c6f

> Introduce SlotID, AllocationID, ResourceProfile
> ---
>
> Key: FLINK-4373
> URL: https://issues.apache.org/jira/browse/FLINK-4373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For the new version of cluster management, we need some more basic data 
> structures:
> * SlotID: Identifier of one single slot located in a task executor
> * AllocationID: Slot allocation identifier, created by the JobManager when 
> requesting a slot, constant across re-tries. Used to identify responses by 
> the ResourceManager and to identify deployment calls towards the TaskManager 
> that was allocated from.
> * ResourceProfile: The resource profile of the desired slot (currently only 
> cpu cores and memory are supported



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893388
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -178,14 +178,19 @@ public String toString() {
 
@Override
public Set keySet() {
+
final HashSet set = new HashSet();
-   final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
 
for (String key : this.backingConfig.keySet()) {
-   if (key.startsWith(this.prefix)) {
+
+   if (this.prefix == null) {
--- End diff --

Can we move the condition out of the loop? The condition does not change 
over different iteration so we don't have to perform it every time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...

2016-08-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893448
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -178,14 +178,19 @@ public String toString() {
 
@Override
public Set keySet() {
+
--- End diff --

We usually don't have an empty line when a method block starts (also 
applies to lines 185 and in `DelegatingConfigurationTest` lines 96 and 110).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422392#comment-15422392
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893388
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -178,14 +178,19 @@ public String toString() {
 
@Override
public Set keySet() {
+
final HashSet set = new HashSet();
-   final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
 
for (String key : this.backingConfig.keySet()) {
-   if (key.startsWith(this.prefix)) {
+
+   if (this.prefix == null) {
--- End diff --

Can we move the condition out of the loop? The condition does not change 
over different iteration so we don't have to perform it every time.


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2371: [FLINK-4309] Potential null pointer dereference in Delega...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2371
  
Thanks for your contribution @tsunny. Your changes look good. I only had a 
minor comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422393#comment-15422393
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893448
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -178,14 +178,19 @@ public String toString() {
 
@Override
public Set keySet() {
+
--- End diff --

We usually don't have an empty line when a method block starts (also 
applies to lines 185 and in `DelegatingConfigurationTest` lines 96 and 110).


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...

2016-08-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893573
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 ---
@@ -88,4 +90,49 @@ private String typeParamToString(Class[] classes) {
assertTrue("Foo method '" + 
configurationMethod.getName() + "' has not been wrapped correctly in 
DelegatingConfiguration wrapper", hasMethod);
}
}
+   
+   @Test
+   public void testDelegationConfigurationWithNullPrefix() {
+
+   Configuration backingConf = new Configuration();
+   backingConf.setValueInternal("test-key", "value");
+
+   DelegatingConfiguration configuration = new 
DelegatingConfiguration(
+   backingConf, null);
+   Set keySet = configuration.keySet();
+
+   assertEquals(keySet, backingConf.keySet());
+
+   }
+
+   @Test
+   public void testDelegationConfigurationWithPrefix() {
+
+   String prefix = "pref-";
+   String expectedKey = "key";
+
+   /*
+* Key matches the prefix
+*/
+   Configuration backingConf = new Configuration();
+   backingConf.setValueInternal(prefix + expectedKey, "value");
+
+   DelegatingConfiguration configuration = new 
DelegatingConfiguration(backingConf, prefix);
+   Set keySet = configuration.keySet();
+   
--- End diff --

empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...

2016-08-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893581
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 ---
@@ -88,4 +90,49 @@ private String typeParamToString(Class[] classes) {
assertTrue("Foo method '" + 
configurationMethod.getName() + "' has not been wrapped correctly in 
DelegatingConfiguration wrapper", hasMethod);
}
}
+   
+   @Test
+   public void testDelegationConfigurationWithNullPrefix() {
+
+   Configuration backingConf = new Configuration();
+   backingConf.setValueInternal("test-key", "value");
+
+   DelegatingConfiguration configuration = new 
DelegatingConfiguration(
+   backingConf, null);
+   Set keySet = configuration.keySet();
+
+   assertEquals(keySet, backingConf.keySet());
+
--- End diff --

empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422395#comment-15422395
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893581
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 ---
@@ -88,4 +90,49 @@ private String typeParamToString(Class[] classes) {
assertTrue("Foo method '" + 
configurationMethod.getName() + "' has not been wrapped correctly in 
DelegatingConfiguration wrapper", hasMethod);
}
}
+   
+   @Test
+   public void testDelegationConfigurationWithNullPrefix() {
+
+   Configuration backingConf = new Configuration();
+   backingConf.setValueInternal("test-key", "value");
+
+   DelegatingConfiguration configuration = new 
DelegatingConfiguration(
+   backingConf, null);
+   Set keySet = configuration.keySet();
+
+   assertEquals(keySet, backingConf.keySet());
+
--- End diff --

empty line


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422394#comment-15422394
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893573
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
 ---
@@ -88,4 +90,49 @@ private String typeParamToString(Class[] classes) {
assertTrue("Foo method '" + 
configurationMethod.getName() + "' has not been wrapped correctly in 
DelegatingConfiguration wrapper", hasMethod);
}
}
+   
+   @Test
+   public void testDelegationConfigurationWithNullPrefix() {
+
+   Configuration backingConf = new Configuration();
+   backingConf.setValueInternal("test-key", "value");
+
+   DelegatingConfiguration configuration = new 
DelegatingConfiguration(
+   backingConf, null);
+   Set keySet = configuration.keySet();
+
+   assertEquals(keySet, backingConf.keySet());
+
+   }
+
+   @Test
+   public void testDelegationConfigurationWithPrefix() {
+
+   String prefix = "pref-";
+   String expectedKey = "key";
+
+   /*
+* Key matches the prefix
+*/
+   Configuration backingConf = new Configuration();
+   backingConf.setValueInternal(prefix + expectedKey, "value");
+
+   DelegatingConfiguration configuration = new 
DelegatingConfiguration(backingConf, prefix);
+   Set keySet = configuration.keySet();
+   
--- End diff --

empty line


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...

2016-08-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893687
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -178,14 +178,19 @@ public String toString() {
 
@Override
public Set keySet() {
+
final HashSet set = new HashSet();
-   final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
 
for (String key : this.backingConfig.keySet()) {
-   if (key.startsWith(this.prefix)) {
+
+   if (this.prefix == null) {
--- End diff --

Yes, we can use can use `addAll`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422396#comment-15422396
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2371
  
Thanks for your contribution @tsunny. Your changes look good. I only had a 
minor comment.


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2342: FLINK-4253 - Rename "recovery.mode" config key to "high-a...

2016-08-16 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2342
  
@uce 
Updated with changes so that all the configs are renamed from 'recovery' to 
'high-availability'.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422398#comment-15422398
 ] 

ASF GitHub Bot commented on FLINK-4253:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2342
  
@uce 
Updated with changes so that all the configs are renamed from 'recovery' to 
'high-availability'.


> Rename "recovery.mode" config key to "high-availability"
> 
>
> Key: FLINK-4253
> URL: https://issues.apache.org/jira/browse/FLINK-4253
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: ramkrishna.s.vasudevan
>
> Currently, HA is configured via the following configuration keys:
> {code}
> recovery.mode: STANDALONE // No high availability (HA)
> recovery.mode: ZOOKEEPER // HA
> {code}
> This could be more straight forward by simply renaming the key to 
> {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We 
> already have standalone cluster mode.
> {code}
> high-availability: NONE // No HA
> high-availability: ZOOKEEPER // HA via ZooKeeper
> {code}
> The {{recovery.mode}} configuration keys would have to be deprecated before 
> completely removing them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422397#comment-15422397
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893687
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -178,14 +178,19 @@ public String toString() {
 
@Override
public Set keySet() {
+
final HashSet set = new HashSet();
-   final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
 
for (String key : this.backingConfig.keySet()) {
-   if (key.startsWith(this.prefix)) {
+
+   if (this.prefix == null) {
--- End diff --

Yes, we can use can use `addAll`


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2371: [FLINK-4309] Potential null pointer dereference in Delega...

2016-08-16 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2371
  
Changes look good all in all. Could you also add a 
`org.apache.flink.util.Preconditions.checkNotNull` check for the backing config 
in the constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422399#comment-15422399
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2371
  
Changes look good all in all. Could you also add a 
`org.apache.flink.util.Preconditions.checkNotNull` check for the backing config 
in the constructor?


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...

2016-08-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74894098
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -178,14 +178,19 @@ public String toString() {
 
@Override
public Set keySet() {
+
final HashSet set = new HashSet();
-   final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
 
for (String key : this.backingConfig.keySet()) {
-   if (key.startsWith(this.prefix)) {
+
+   if (this.prefix == null) {
--- End diff --

Actually, it's probably better to just return the keySet of the backing 
config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422400#comment-15422400
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74894098
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -178,14 +178,19 @@ public String toString() {
 
@Override
public Set keySet() {
+
final HashSet set = new HashSet();
-   final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
 
for (String key : this.backingConfig.keySet()) {
-   if (key.startsWith(this.prefix)) {
+
+   if (this.prefix == null) {
--- End diff --

Actually, it's probably better to just return the keySet of the backing 
config.


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4401) Add docs about ordering in streams

2016-08-16 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-4401:
--

 Summary: Add docs about ordering in streams
 Key: FLINK-4401
 URL: https://issues.apache.org/jira/browse/FLINK-4401
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Ufuk Celebi


I've seen questions about ordering guarantees in streams come up frequently on 
the mailing lists. We should add a page about this to the docs in order to make 
it explicit what kind of ordering guarantees users get for streams, keyed 
streams, windows, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2371: [FLINK-4309] Potential null pointer dereference in Delega...

2016-08-16 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2371
  
Thanks for addressing our comments! I'm going to merge this later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422473#comment-15422473
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74901158
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -56,6 +57,7 @@ public DelegatingConfiguration() {
 */
public DelegatingConfiguration(Configuration backingConfig, String 
prefix)
{
+   Preconditions.checkNotNull(backingConfig);
this.backingConfig = backingConfig;
--- End diff --

Trivial note: We can also write `this.backingConfig = 
checkNotNull(backingConfig)` here.


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...

2016-08-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74901158
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -56,6 +57,7 @@ public DelegatingConfiguration() {
 */
public DelegatingConfiguration(Configuration backingConfig, String 
prefix)
{
+   Preconditions.checkNotNull(backingConfig);
this.backingConfig = backingConfig;
--- End diff --

Trivial note: We can also write `this.backingConfig = 
checkNotNull(backingConfig)` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422474#comment-15422474
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2371
  
Thanks for addressing our comments! I'm going to merge this later today.


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2371: [FLINK-4309] Potential null pointer dereference in Delega...

2016-08-16 Thread tsunny
Github user tsunny commented on the issue:

https://github.com/apache/flink/pull/2371
  
No problem. Happy to contribute. Thanks for the review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422489#comment-15422489
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user tsunny commented on the issue:

https://github.com/apache/flink/pull/2371
  
No problem. Happy to contribute. Thanks for the review comments.


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...

2016-08-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2371


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422573#comment-15422573
 ] 

ASF GitHub Bot commented on FLINK-4309:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2371


> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
> Fix For: 1.2.0, 1.1.2
>
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-16 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4309.
--
   Resolution: Fixed
Fix Version/s: 1.1.2
   1.2.0

Fixed in ad34540 (master), c2906c0 (release-1.1)

> Potential null pointer dereference in DelegatingConfiguration#keySet()
> --
>
> Key: FLINK-4309
> URL: https://issues.apache.org/jira/browse/FLINK-4309
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Sunny T
>Priority: Minor
> Fix For: 1.2.0, 1.1.2
>
>
> {code}
> final int prefixLen = this.prefix == null ? 0 : this.prefix.length();
> for (String key : this.backingConfig.keySet()) {
>   if (key.startsWith(this.prefix)) {
> {code}
> If this.prefix == null, we would get NPE in startsWith():
> {code}
> public boolean startsWith(String prefix, int toffset) {
> char ta[] = value;
> int to = toffset;
> char pa[] = prefix.value;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2342: FLINK-4253 - Rename "recovery.mode" config key to ...

2016-08-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2342#discussion_r74918068
  
--- Diff: docs/setup/config.md ---
@@ -285,7 +285,7 @@ of the JobManager, because the same ActorSystem is 
used. Its not possible to use
 
 ## High Availability Mode
 
-- `recovery.mode`: (Default 'standalone') Defines the recovery mode used 
for the cluster execution. Currently, Flink supports the 'standalone' mode 
where only a single JobManager runs and no JobManager state is checkpointed. 
The high availability mode 'zookeeper' supports the execution of multiple 
JobManagers and JobManager state checkpointing. Among the group of JobManagers, 
ZooKeeper elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.
+- `high-availability`: (Default 'none') Defines the recovery mode used for 
the cluster execution. Currently, Flink supports the 'none' mode where only a 
single JobManager runs and no JobManager state is checkpointed. The high 
availability mode 'zookeeper' supports the execution of multiple JobManagers 
and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper 
elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.  Previously this config was 
named 'recovery.mode' and the default config was 'standalone'.
 
 - `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is 
used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is 
selected
--- End diff --

Docs not updated (same for jobmanager_high_availability.md)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422617#comment-15422617
 ] 

ASF GitHub Bot commented on FLINK-4253:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2342#discussion_r74918068
  
--- Diff: docs/setup/config.md ---
@@ -285,7 +285,7 @@ of the JobManager, because the same ActorSystem is 
used. Its not possible to use
 
 ## High Availability Mode
 
-- `recovery.mode`: (Default 'standalone') Defines the recovery mode used 
for the cluster execution. Currently, Flink supports the 'standalone' mode 
where only a single JobManager runs and no JobManager state is checkpointed. 
The high availability mode 'zookeeper' supports the execution of multiple 
JobManagers and JobManager state checkpointing. Among the group of JobManagers, 
ZooKeeper elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.
+- `high-availability`: (Default 'none') Defines the recovery mode used for 
the cluster execution. Currently, Flink supports the 'none' mode where only a 
single JobManager runs and no JobManager state is checkpointed. The high 
availability mode 'zookeeper' supports the execution of multiple JobManagers 
and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper 
elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.  Previously this config was 
named 'recovery.mode' and the default config was 'standalone'.
 
 - `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is 
used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is 
selected
--- End diff --

Docs not updated (same for jobmanager_high_availability.md)


> Rename "recovery.mode" config key to "high-availability"
> 
>
> Key: FLINK-4253
> URL: https://issues.apache.org/jira/browse/FLINK-4253
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: ramkrishna.s.vasudevan
>
> Currently, HA is configured via the following configuration keys:
> {code}
> recovery.mode: STANDALONE // No high availability (HA)
> recovery.mode: ZOOKEEPER // HA
> {code}
> This could be more straight forward by simply renaming the key to 
> {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We 
> already have standalone cluster mode.
> {code}
> high-availability: NONE // No HA
> high-availability: ZOOKEEPER // HA via ZooKeeper
> {code}
> The {{recovery.mode}} configuration keys would have to be deprecated before 
> completely removing them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2342: FLINK-4253 - Rename "recovery.mode" config key to ...

2016-08-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2342#discussion_r74918209
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -77,7 +77,7 @@
 
/**
 * Shutdown hook thread to ensure deletion of the storage directory (or 
null if
-* the configured recovery mode does not equal{@link 
RecoveryMode#STANDALONE})
+* the configured recovery mode does not equal{@link RecoveryMode#NONE})
--- End diff --

I think we could should also rename this enum to `HighAvailabilityMode` or 
so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422618#comment-15422618
 ] 

ASF GitHub Bot commented on FLINK-4253:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2342#discussion_r74918209
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -77,7 +77,7 @@
 
/**
 * Shutdown hook thread to ensure deletion of the storage directory (or 
null if
-* the configured recovery mode does not equal{@link 
RecoveryMode#STANDALONE})
+* the configured recovery mode does not equal{@link RecoveryMode#NONE})
--- End diff --

I think we could should also rename this enum to `HighAvailabilityMode` or 
so.


> Rename "recovery.mode" config key to "high-availability"
> 
>
> Key: FLINK-4253
> URL: https://issues.apache.org/jira/browse/FLINK-4253
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: ramkrishna.s.vasudevan
>
> Currently, HA is configured via the following configuration keys:
> {code}
> recovery.mode: STANDALONE // No high availability (HA)
> recovery.mode: ZOOKEEPER // HA
> {code}
> This could be more straight forward by simply renaming the key to 
> {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We 
> already have standalone cluster mode.
> {code}
> high-availability: NONE // No HA
> high-availability: ZOOKEEPER // HA via ZooKeeper
> {code}
> The {{recovery.mode}} configuration keys would have to be deprecated before 
> completely removing them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2374: [FLINK-3950] Add Meter Metric Type

2016-08-16 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2374
  
Hi @zentol 
Thank you for your prompt review.
Sorry, I was not aware that there is Metrics documentation. I'll update the 
PR today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422619#comment-15422619
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2374
  
Hi @zentol 
Thank you for your prompt review.
Sorry, I was not aware that there is Metrics documentation. I'll update the 
PR today.


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2370: [FLINK-4373] [cluster management] Introduce SlotID, Alloc...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2370
  
Not automatically closed by the asfgit bot. @KurtYoung could you close this 
PR manually?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4373) Introduce SlotID, AllocationID, ResourceProfile

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422656#comment-15422656
 ] 

ASF GitHub Bot commented on FLINK-4373:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2370
  
Not automatically closed by the asfgit bot. @KurtYoung could you close this 
PR manually?


> Introduce SlotID, AllocationID, ResourceProfile
> ---
>
> Key: FLINK-4373
> URL: https://issues.apache.org/jira/browse/FLINK-4373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For the new version of cluster management, we need some more basic data 
> structures:
> * SlotID: Identifier of one single slot located in a task executor
> * AllocationID: Slot allocation identifier, created by the JobManager when 
> requesting a slot, constant across re-tries. Used to identify responses by 
> the ResourceManager and to identify deployment calls towards the TaskManager 
> that was allocated from.
> * ResourceProfile: The resource profile of the desired slot (currently only 
> cpu cores and memory are supported



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4399) Add support for oversized messages

2016-08-16 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422667#comment-15422667
 ] 

Till Rohrmann commented on FLINK-4399:
--

Hi Felix,

both issues have the same idea. However, this issue tries to solve the problem 
at a lower level, namely at the level of the remote procedure calls. This means 
that we support "large" messages/RPCs in general instead of only supporting it 
for the accumulators.

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
> Fix For: 1.2.0
>
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-16 Thread KurtYoung
Github user KurtYoung closed the pull request at:

https://github.com/apache/flink/pull/2370


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2370: [FLINK-4373] [cluster management] Introduce SlotID, Alloc...

2016-08-16 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2370
  
@tillrohrmann Yeah, of course. Closing it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4373) Introduce SlotID, AllocationID, ResourceProfile

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422674#comment-15422674
 ] 

ASF GitHub Bot commented on FLINK-4373:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2370
  
@tillrohrmann Yeah, of course. Closing it now.


> Introduce SlotID, AllocationID, ResourceProfile
> ---
>
> Key: FLINK-4373
> URL: https://issues.apache.org/jira/browse/FLINK-4373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For the new version of cluster management, we need some more basic data 
> structures:
> * SlotID: Identifier of one single slot located in a task executor
> * AllocationID: Slot allocation identifier, created by the JobManager when 
> requesting a slot, constant across re-tries. Used to identify responses by 
> the ResourceManager and to identify deployment calls towards the TaskManager 
> that was allocated from.
> * ResourceProfile: The resource profile of the desired slot (currently only 
> cpu cores and memory are supported



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4373) Introduce SlotID, AllocationID, ResourceProfile

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422675#comment-15422675
 ] 

ASF GitHub Bot commented on FLINK-4373:
---

Github user KurtYoung closed the pull request at:

https://github.com/apache/flink/pull/2370


> Introduce SlotID, AllocationID, ResourceProfile
> ---
>
> Key: FLINK-4373
> URL: https://issues.apache.org/jira/browse/FLINK-4373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For the new version of cluster management, we need some more basic data 
> structures:
> * SlotID: Identifier of one single slot located in a task executor
> * AllocationID: Slot allocation identifier, created by the JobManager when 
> requesting a slot, constant across re-tries. Used to identify responses by 
> the ResourceManager and to identify deployment calls towards the TaskManager 
> that was allocated from.
> * ResourceProfile: The resource profile of the desired slot (currently only 
> cpu cores and memory are supported



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2353: [FLINK-4355] [cluster management] Implement TaskManager s...

2016-08-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2353
  
I like that idea.

Getting the TaskExecutor out of the registration means that we cannot 
transmit the slot report immediately with each registration call (it only makes 
sense when each attempt grabs the latest slot report).

That might not be too bad for the TaskExecutor, as it can always just 
eagerly transmit a slot report (such as when responding to a heartbeat).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4355) Implement TaskManager side of registration at ResourceManager

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422723#comment-15422723
 ] 

ASF GitHub Bot commented on FLINK-4355:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2353
  
I like that idea.

Getting the TaskExecutor out of the registration means that we cannot 
transmit the slot report immediately with each registration call (it only makes 
sense when each attempt grabs the latest slot report).

That might not be too bad for the TaskExecutor, as it can always just 
eagerly transmit a slot report (such as when responding to a heartbeat).


> Implement TaskManager side of registration at ResourceManager
> -
>
> Key: FLINK-4355
> URL: https://issues.apache.org/jira/browse/FLINK-4355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Stephan Ewen
>
> If the {{TaskManager}} is unregistered, it should try and register at the 
> {{ResourceManager}} leader. The registration messages are fenced via the 
> {{RmLeaderID}}.
> The ResourceManager may acknowledge the registration (or respond that the 
> TaskManager is AlreadyRegistered) or refuse the registration.
> Upon registration refusal, the TaskManager may have to kill itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4402) Wrong metrics parameter names in documentation

2016-08-16 Thread RWenden (JIRA)
RWenden created FLINK-4402:
--

 Summary: Wrong metrics parameter names in documentation 
 Key: FLINK-4402
 URL: https://issues.apache.org/jira/browse/FLINK-4402
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.1
 Environment: all
Reporter: RWenden
Priority: Trivial
 Fix For: 1.1.2


On the page 
https://ci.apache.org/projects/flink/flink-docs-master/apis/metrics.html
the following metrics parameters should be

faulty: metrics.scope.tm.task , should be metrics.scope.task
faulty: metrics.scope.tm.operator , should be metrics.scope.operator

to make it work on Flink 1.1.1.

But to fix this, the constants in ConfigConstants.java can also be changed to 
fit the documentation. Either way...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74937306
  
--- Diff: flink-dist/pom.xml ---
@@ -113,8 +113,13 @@ under the License.
flink-metrics-jmx
${project.version}

+
+   
+   org.apache.flink
+   flink-mesos_2.10
+   ${project.version}
+   
--- End diff --

We might introduce an "include-mesos" profile where we add this dependency 
to flink-dist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422742#comment-15422742
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74937306
  
--- Diff: flink-dist/pom.xml ---
@@ -113,8 +113,13 @@ under the License.
flink-metrics-jmx
${project.version}

+
+   
+   org.apache.flink
+   flink-mesos_2.10
+   ${project.version}
+   
--- End diff --

We might introduce an "include-mesos" profile where we add this dependency 
to flink-dist.


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74937890
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+
+   
+   com.typesafe.akka
+   
akka-actor_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-remote_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-slf4j_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-testkit_${scala.binary.version}
+   
+
+   
--- End diff --

Can we remove this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74938223
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+
+   
+   com.typesafe.akka
+   
akka-actor_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-remote_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-slf4j_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-testkit_${scala.binary.version}
+   
+
+   
+
+   
+   com.google.guava
+   guava
+   ${guava.version}
+   
--- End diff --

Are you using Guava or is this a dependency for Mesos?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422747#comment-15422747
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74938223
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+
+   
+   com.typesafe.akka
+   
akka-actor_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-remote_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-slf4j_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-testkit_${scala.binary.version}
+   
+
+   
+
+   
+   com.google.guava
+   guava
+   ${guava.version}
+   
--- End diff --

Are you using Guava or is this a dependency for Mesos?


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74938631
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+
+   
+   com.typesafe.akka
+   
akka-actor_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-remote_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-slf4j_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-testkit_${scala.binary.version}
+   
+
+   
+
+   
+   com.google.guava
+   guava
+   ${guava.version}
+   
+
+   
+   org.apache.curator
+   curator-test
+   ${curator.version}
+   test
--- End diff --

It might make sense to add this dependency to the dependency management 
section of the parent pom since also other modules use it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422750#comment-15422750
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74938631
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+
+   
+   com.typesafe.akka
+   
akka-actor_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-remote_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-slf4j_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-testkit_${scala.binary.version}
+   
+
+   
+
+   
+   com.google.guava
+   guava
+   ${guava.version}
+   
+
+   
+   org.apache.curator
+   curator-test
+   ${curator.version}
+   test
--- End diff --

It might make sense to add this dependency to the dependency management 
section of the parent pom since also other modules use it.


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422752#comment-15422752
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74938853
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
--- End diff --

Maybe group all test dependencies under the test structural comment.


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422745#comment-15422745
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74937890
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+
+   
+   com.typesafe.akka
+   
akka-actor_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-remote_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-slf4j_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-testkit_${scala.binary.version}
+   
+
+   
--- End diff --

Can we remove this?


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74940821
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.cli;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class FlinkMesosSessionCli {
--- End diff --

Java docs are missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422759#comment-15422759
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74940821
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.cli;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class FlinkMesosSessionCli {
--- End diff --

Java docs are missing.


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422764#comment-15422764
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74941466
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74941466
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.emptyMap();
+   }
+
+   @Override
+   public List getHardConstraints() 
{
+   return null;
+   }
+
+   @Override
+   public List 
getSoftConstraints() {
   

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74941614
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.emptyMap();
+   }
+
+   @Override
+   public List getHardConstraints() 
{
+   return null;
+   }
+
+   @Override
+   public List 
getSoftConstraints() {
   

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74941852
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.emptyMap();
+   }
+
+   @Override
+   public List getHardConstraints() 
{
+   return null;
+   }
+
+   @Override
+   public List 
getSoftConstraints() {
   

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422767#comment-15422767
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74941614
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422770#comment-15422770
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74941852
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74938853
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
--- End diff --

Maybe group all test dependencies under the test structural comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...

2016-08-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2369
  
Just looked over this briefly.
What struck me first is that this again uses the dirty trick of adding a 
dependency to "Flink Kafka 0.9" and then transitively excluding "Kafka 0.9". Is 
there a nicer way to solve this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422807#comment-15422807
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2369
  
Just looked over this briefly.
What struck me first is that this again uses the dirty trick of adding a 
dependency to "Flink Kafka 0.9" and then transitively excluding "Kafka 0.9". Is 
there a nicer way to solve this?


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74947651
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422810#comment-15422810
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74947651
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422811#comment-15422811
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74947914
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422814#comment-15422814
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74948145
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422820#comment-15422820
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74948586
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74948586
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74948792
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422823#comment-15422823
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74948792
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74949114
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422829#comment-15422829
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74949114
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74947914
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74949669
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74948145
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422832#comment-15422832
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74949669
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74950002
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422833#comment-15422833
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74950002
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74950095
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422834#comment-15422834
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74950095
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74950716
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422839#comment-15422839
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74950716
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master fail

[GitHub] flink pull request #2376: [FLINK-3755] Introduce key groups for key-value st...

2016-08-16 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2376

[FLINK-3755] Introduce key groups for key-value state to support dynamic 
scaling

This pull request introduces the concept of key groups to Flink. A key 
group is the smallest assignable unit of key-value state to a stream operator. 
Differently said, it is a sub set of the key space which can be assigned to a 
stream operator. With the introduction of key groups, it will be possible to 
dynamically scale Flink operators that use partitioned (=key-value) state.

In particular, this pull request addresses the following sub-issues:
- fully: [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism 
Parameter
- partially: [FLINK-4381] Refactor State to Prepare For Key-Group State 
Backends

Furthermore, this pull request is partially based on pull request: #1988

Overall, this pull request introduces the following changes:

# 1) Adopted from #1988 (plus introduction of distributing keys as ranges 
(`KeyGroupRange`) 

## a) Introduction of KeyGroupAssigner

In order to partition keys into key groups, the`KeyGroupAssigner` interface 
is introduced. This allows for partitioning the key space into smaller units 
which can be assigned to operators. A scale up/down of parallelism is then 
performed by simply reassigning the key groups to more/less operators.

For this pull request, the former `HashPartitioner` is now renamed to 
`KeyGroupStreamPartitioner` and uses the `KeyGroupAssigner` to distribute the 
streaming records in a consistent way w.r.t. the key group mappings. The key 
groups, in turn, are mapped as ranges of key groups (`key group index * 
parallelism / number of key groups` = out-going channel) to the downstream 
tasks. 

When restoring from a checkpoint or savepoint, scale up/down of parallelism 
basically boils down to splitting/merging the key group ranges in alignment 
with the adjusting assignment to operators that happens automatically through 
the `KeyGroupStreamPartitioner`.

## b) Introduction of MaxParallelism to user API

In order to scale programs up or down, it is necessary to define the 
maximum number of key groups. The maximum number of key groups denotes the 
maximum parallelism of an operator, because every operator needs at least one 
key group to get elements assigned. Thus, in order to specify this upper limit, 
the ExecutionConfig allows to set a job-wide max parallelism value via 
ExecutionConfig.setMaxParallelism. In addition to that the 
SingleOutputStreamOperator allows to set similarly to the parallelism a max 
parallelism value on an operator basis. If the max parallelism has not been set 
and there is no job-wide max parallelism set, the parallelism of the operator 
will be taken as the max parallelism. Thus, every operator would then receive a 
single key group. Currently, we restrict the maximum number of key groups to 
2^15 (Short.MAX_VALUE).

# 2)  State and StateHandle refactoring

## a) StateHandle refactoring

We have simplified and cleaned up the hierarchy and use cases of state 
handles. `StreamStateHandle` and `RetrievableStateHandle` are now at the heart 
of the state handles system.
Their conceptual main difference is that `StreamStateHandle` provides a 
seekable input stream to the actual state data and leaves state reconstruction 
to client code, whereas `RetrievableStateHandle` represents a simple way for 
client code to retrieve state as readily constructed object and the state 
handle implementation taking care of state reconstruction.

## b) Operator serialization

The unified abstraction for operators to persist their state is 
`CheckpointStateOutputStream`. All operators can simply directly write their 
serialized state into this stream, which returns a `StreamStateHandle` on 
close. `StreamTaskState` and `StreamTaskStateList` become obsolete. This change 
makes versioning of operator state serialization formats easier and we should 
ensure and test that our operators are aware of serialization versions.

This change leaves the following methods for snapshot/restore in 
`StreamOperator`:
```
void snapshotState(
FSDataOutputStream out, 
long checkpointId, 
long timestamp) throws Exception;

void restoreState(FSDataInputStream in) throws Exception;
```
## c) Split task state into operator state (= non-partitioned state) and 
keyed-state (= partitioned state)

We have split the operator state into operator state and keyed state as 
follows. 

Operator state is organized as a `ChainedStateHandle`. 
The chained state handle encapsulates the individual `StreamStateHandle` for 
all operators in an operator chain.
 
Keyed state is organized as a `List`. Each 
`KeyGroupsStateHandle` consists of one `StreamStateHandle` and one 
`KeyGroupRangeOffs

[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422848#comment-15422848
 ] 

ASF GitHub Bot commented on FLINK-3755:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2376

[FLINK-3755] Introduce key groups for key-value state to support dynamic 
scaling

This pull request introduces the concept of key groups to Flink. A key 
group is the smallest assignable unit of key-value state to a stream operator. 
Differently said, it is a sub set of the key space which can be assigned to a 
stream operator. With the introduction of key groups, it will be possible to 
dynamically scale Flink operators that use partitioned (=key-value) state.

In particular, this pull request addresses the following sub-issues:
- fully: [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism 
Parameter
- partially: [FLINK-4381] Refactor State to Prepare For Key-Group State 
Backends

Furthermore, this pull request is partially based on pull request: #1988

Overall, this pull request introduces the following changes:

# 1) Adopted from #1988 (plus introduction of distributing keys as ranges 
(`KeyGroupRange`) 

## a) Introduction of KeyGroupAssigner

In order to partition keys into key groups, the`KeyGroupAssigner` interface 
is introduced. This allows for partitioning the key space into smaller units 
which can be assigned to operators. A scale up/down of parallelism is then 
performed by simply reassigning the key groups to more/less operators.

For this pull request, the former `HashPartitioner` is now renamed to 
`KeyGroupStreamPartitioner` and uses the `KeyGroupAssigner` to distribute the 
streaming records in a consistent way w.r.t. the key group mappings. The key 
groups, in turn, are mapped as ranges of key groups (`key group index * 
parallelism / number of key groups` = out-going channel) to the downstream 
tasks. 

When restoring from a checkpoint or savepoint, scale up/down of parallelism 
basically boils down to splitting/merging the key group ranges in alignment 
with the adjusting assignment to operators that happens automatically through 
the `KeyGroupStreamPartitioner`.

## b) Introduction of MaxParallelism to user API

In order to scale programs up or down, it is necessary to define the 
maximum number of key groups. The maximum number of key groups denotes the 
maximum parallelism of an operator, because every operator needs at least one 
key group to get elements assigned. Thus, in order to specify this upper limit, 
the ExecutionConfig allows to set a job-wide max parallelism value via 
ExecutionConfig.setMaxParallelism. In addition to that the 
SingleOutputStreamOperator allows to set similarly to the parallelism a max 
parallelism value on an operator basis. If the max parallelism has not been set 
and there is no job-wide max parallelism set, the parallelism of the operator 
will be taken as the max parallelism. Thus, every operator would then receive a 
single key group. Currently, we restrict the maximum number of key groups to 
2^15 (Short.MAX_VALUE).

# 2)  State and StateHandle refactoring

## a) StateHandle refactoring

We have simplified and cleaned up the hierarchy and use cases of state 
handles. `StreamStateHandle` and `RetrievableStateHandle` are now at the heart 
of the state handles system.
Their conceptual main difference is that `StreamStateHandle` provides a 
seekable input stream to the actual state data and leaves state reconstruction 
to client code, whereas `RetrievableStateHandle` represents a simple way for 
client code to retrieve state as readily constructed object and the state 
handle implementation taking care of state reconstruction.

## b) Operator serialization

The unified abstraction for operators to persist their state is 
`CheckpointStateOutputStream`. All operators can simply directly write their 
serialized state into this stream, which returns a `StreamStateHandle` on 
close. `StreamTaskState` and `StreamTaskStateList` become obsolete. This change 
makes versioning of operator state serialization formats easier and we should 
ensure and test that our operators are aware of serialization versions.

This change leaves the following methods for snapshot/restore in 
`StreamOperator`:
```
void snapshotState(
FSDataOutputStream out, 
long checkpointId, 
long timestamp) throws Exception;

void restoreState(FSDataInputStream in) throws Exception;
```
## c) Split task state into operator state (= non-partitioned state) and 
keyed-state (= partitioned state)

We have split the operator state into operator state and keyed state as 
follows. 

Operator state is organized as a `ChainedStateHandle`.

[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-16 Thread cresny
Github user cresny commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74957429
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 ---
@@ -62,4 +64,34 @@ public void run() {
throw asyncException.f0;
}
}
+
+   /**
+* Ensure that target path terminates with a new directory to be 
created by fs. If remoteURI does not specify a new
+* directory, append local directory name.
+* @param fs
+* @param localPath
+* @param remoteURI
+* @return
+* @throws IOException
+*/
+   protected static URI checkInitialDirectory(final FileSystem fs,final 
File localPath, final URI remoteURI) throws IOException {
+   if (localPath.isDirectory()) {
+   Path remotePath = new Path(remoteURI);
+   if (fs.exists(remotePath)) {
+   return new 
Path(remotePath,localPath.getName()).toUri();
+   }
+   }
+   return remoteURI;
+   }
+
+   protected static void copyFromLocalFile(final FileSystem fs, final File 
localPath, final URI remotePath) throws Exception {
--- End diff --

Ufuk, I'm a bit confused about this. flink-core FileSystem does not 
implement copyFromLocal, and both callers of this are using the native hadoop 
FileSystem. It would be great not to copy code, but it appears all hadoop 
dependencies are abstracted out of flink-core, so it would not be there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422901#comment-15422901
 ] 

ASF GitHub Bot commented on FLINK-3755:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2376
  
R: @tillrohrmann @StephanEwen for review pls


> Introduce key groups for key-value state to support dynamic scaling
> ---
>
> Key: FLINK-3755
> URL: https://issues.apache.org/jira/browse/FLINK-3755
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> In order to support dynamic scaling, it is necessary to sub-partition the 
> key-value states of each operator. This sub-partitioning, which produces a 
> set of key groups, allows to easily scale in and out Flink jobs by simply 
> reassigning the different key groups to the new set of sub tasks. The idea of 
> key groups is described in this design document [1]. 
> [1] 
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2376: [FLINK-3755] Introduce key groups for key-value state to ...

2016-08-16 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2376
  
R: @tillrohrmann @StephanEwen for review pls


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74961561
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 ---
@@ -62,4 +64,34 @@ public void run() {
throw asyncException.f0;
}
}
+
+   /**
+* Ensure that target path terminates with a new directory to be 
created by fs. If remoteURI does not specify a new
+* directory, append local directory name.
+* @param fs
+* @param localPath
+* @param remoteURI
+* @return
+* @throws IOException
+*/
+   protected static URI checkInitialDirectory(final FileSystem fs,final 
File localPath, final URI remoteURI) throws IOException {
+   if (localPath.isDirectory()) {
+   Path remotePath = new Path(remoteURI);
+   if (fs.exists(remotePath)) {
+   return new 
Path(remotePath,localPath.getName()).toUri();
+   }
+   }
+   return remoteURI;
+   }
+
+   protected static void copyFromLocalFile(final FileSystem fs, final File 
localPath, final URI remotePath) throws Exception {
--- End diff --

Sorry about this! Actually I was confused. I didn't see that 
`HDFSCopyFromLocal` is using `org.apache.hadoop.fs.FileSystem` and not the 
Flink `FileSystem`. So it should be fine to have it in 
`flink-hadoop-compatability` instead. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2358: [FLINK-4382] Buffer rpc calls until the RpcEndpoin...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2358


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >