[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16244088#comment-16244088
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4729


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243803#comment-16243803
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r149636445
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +242,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   if (workerNode != null) {
+   Container container = workerNode.getContainer();
+   log.info("Stopping container {}.", 
container.getId().toString());
--- End diff --

`toString` call unnecessary.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243264#comment-16243264
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/4729
  
@tillrohrmann Updated the PR that addresses your comments. Could you please 
take another look when you have time?


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243101#comment-16243101
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r149540088
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241718#comment-16241718
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r149307271
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

Remove the type constraint in `ResourceManager`, I would suggest.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241696#comment-16241696
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r149304158
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

@tillrohrmann But in ResourceManager.java, WorkerType has to be 
Serializable (i.e., "ResourceManager"). Do you 
have any recommendation?


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237763#comment-16237763
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148812132
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237749#comment-16237749
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808721
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -63,11 +64,14 @@
  * The yarn implementation of the resource manager. Used when the system 
is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
 
/** The process environment variables. */
private final Map env;
 
+   /** YARN container map. Package private for unit test purposes. */
+   final Map workerNodeMap;
--- End diff --

Let's make the key `ResourceID`


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237761#comment-16237761
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810490
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
+
+   private Container yarnContainer;
+
+   public YarnWorkerNode(Container container) {
+   this.yarnContainer = container;
--- End diff --

`Preconditions.checkNotNull`.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237750#comment-16237750
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808362
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

Let's remove the workerNode after we have stopped it.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237748#comment-16237748
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148809690
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -260,6 +287,7 @@ public void onContainersAllocated(List 
containers) {
numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
log.info("Received new container: {} - Remaining 
pending container requests: {}",
container.getId(), 
numPendingContainerRequests);
+   workerNodeMap.put(container.getId().toString(), new 
YarnWorkerNode(container));
--- End diff --

Here we should create a `ResourceID` to use it as the key. This 
`ResourceID` must then be send to the launched TaskManager such that it uses it 
upon registration.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237747#comment-16237747
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808279
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
--- End diff --

I think this should be checked first.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237755#comment-16237755
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814129
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237760#comment-16237760
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814354
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237746#comment-16237746
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148808482
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,14 +241,27 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
-   return false;
+   public boolean stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
+   Container container = workerNode.getYarnContainer();
+   log.info("Stopping container {}.", 
container.getId().toString());
+   // release the container on the node manager
+   try {
+   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
+   } catch (Throwable t) {
+   log.error("Error while calling YARN Node 
Manager to stop container", t);
--- End diff --

this should be a warning


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237757#comment-16237757
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148812650
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237762#comment-16237762
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814381
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237759#comment-16237759
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811956
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237751#comment-16237751
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811288
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237764#comment-16237764
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148814557
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237758#comment-16237758
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148813642
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237745#comment-16237745
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148807573
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -63,11 +64,14 @@
  * The yarn implementation of the resource manager. Used when the system 
is started
  * via the resource framework YARN.
  */
-public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
+public class YarnResourceManager extends ResourceManager 
implements AMRMClientAsync.CallbackHandler {
 
/** The process environment variables. */
private final Map env;
 
+   /** YARN container map. Package private for unit test purposes. */
+   final Map workerNodeMap;
--- End diff --

Let's make it more visible that this map is a concurrent map by setting the 
type to `CocnurrentHashMap`.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237754#comment-16237754
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811085
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237756#comment-16237756
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810594
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
+
+   private Container yarnContainer;
+
+   public YarnWorkerNode(Container container) {
+   this.yarnContainer = container;
+   }
+
+   @Override
+   public ResourceID getResourceID() {
+   return new ResourceID(yarnContainer.getId().toString());
--- End diff --

let's store the `ResourceID` explicitly.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237752#comment-16237752
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148811663
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237753#comment-16237753
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810078
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

`serialVersionUID` is missing.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237744#comment-16237744
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r148810409
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
--- End diff --

Actually this class should not be `serializable` because `Container` is 
also not serializable.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216430#comment-16216430
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/4729
  
@tillrohrmann can you take another look? I've updated the code as well as 
the test. Thanks.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16209186#comment-16209186
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r145388074
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

Yes this is correct. Let's keep the `workerNodeMap`. But then we have to 
make sure, that we always maintain a consistent view on the currently 
registered workers (e.g. properly removing entries upon failure, etc.)


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16208868#comment-16208868
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r145325673
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

Since we will need to keep workerNodeMap anyway, we can actually just look 
up the containerId using resourceId from workerNodeMap, there is no need to 
calculate the containerId from resourceId, so I think we dont need to use 
ContainerId.fromString. Also, copying the code will have compatibility issue if 
the fromString method differ in different hadoop version. What do you think?


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207192#comment-16207192
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r145067245
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
+   log.info("Stopping container {}.", 
workerNode.getYarnContainer().getId().toString());
+   
resourceManagerClient.releaseAssignedContainer(workerNode.getYarnContainer().getId());
--- End diff --

Yes, I think that's also how we do it in the old `YarnFlinkResourceManager`.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207190#comment-16207190
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r145067180
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
+   log.info("Stopping container {}.", 
workerNode.getYarnContainer().getId().toString());
+   
resourceManagerClient.releaseAssignedContainer(workerNode.getYarnContainer().getId());
--- End diff --

So I think it's necessary to have the workerNodeMap to keep the mapping 
from resourceId to YarnWorkerNode as resourceId does not have the nodeId 
information. What do you think?


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207189#comment-16207189
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r145067095
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
+
+   private Container yarnContainer;
--- End diff --

True.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207166#comment-16207166
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r145061149
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

Yes we do, if the application id did not change, then we could simply copy 
the code. But we have to add an attribution to it.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207149#comment-16207149
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r145054746
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
+
+   private Container yarnContainer;
--- End diff --

To call stopContainer, we need also the NodeId in the Yarn Container.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207148#comment-16207148
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r145054599
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
+   log.info("Stopping container {}.", 
workerNode.getYarnContainer().getId().toString());
+   
resourceManagerClient.releaseAssignedContainer(workerNode.getYarnContainer().getId());
--- End diff --

Good point. In such case, I think we need YarnWorkerNode to keep the 
returned Container in order to retrieve both the containerId and nodeId to call 
stopContainer.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207147#comment-16207147
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r145053774
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

This API is only available from hadoop 2.6. Dont we need to support 2.3 and 
2.4 as well?


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200021#comment-16200021
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r143959669
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

I might have already asked you this, but looking at the code, I think we 
initialize the `ResourceID` on the `TaskManager` with `ContainerId.toString`. 
Can't we use this to reconstruct the `ContainerId` similar to 
http://hadoop.apache.org/docs/r2.8.0/api/org/apache/hadoop/yarn/api/records/ContainerId.html#fromString(java.lang.String)?


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200022#comment-16200022
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r143958924
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
+   if (workerNode != null) {
+   log.info("Stopping container {}.", 
workerNode.getYarnContainer().getId().toString());
+   
resourceManagerClient.releaseAssignedContainer(workerNode.getYarnContainer().getId());
--- End diff --

I think we should also call `nodeManagerClient.stopContainer()` before.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199923#comment-16199923
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r143934519
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

@tillrohrmann I think the reason that we need the workerNodeMap is because 
that the task executor only has the resourceID when it tries to register with 
the resource manager. So the resource manager has to look up the YarnWorkerNode 
using the resouceID somehow when registerTaskExecutor is called. You can look 
at the workerStarted(ResourceID resourceID) method.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199925#comment-16199925
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r143934625
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198410#comment-16198410
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r143666272
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -227,13 +241,19 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   public void stopWorker(YarnWorkerNode workerNode) {
+   workerNodeMap.remove(workerNode.getResourceID().toString());
--- End diff --

If we store the container id in `YarnWorkerNode`, then we don't need the 
`workerNodeMap`.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198412#comment-16198412
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r143666417
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198411#comment-16198411
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r143672401
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import 

[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198409#comment-16198409
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r143666153
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.io.Serializable;
+
+/**
+ * A stored YARN worker, which contains the YARN container.
+ */
+public class YarnWorkerNode implements ResourceIDRetrievable, Serializable 
{
+
+   private Container yarnContainer;
--- End diff --

I think we should store here the container id.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193795#comment-16193795
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

GitHub user suez1224 reopened a pull request:

https://github.com/apache/flink/pull/4729

[FLINK-7076] [ResourceManager] implement YARN stopWorker logic

## What is the purpose of the change

*Implement stopWorker logic for YarnResourceManager*


## Brief change log

  - *Added a ConcurrentHashMap to keep the ResourceID to Yarn ContainerId 
mappings*
  - *Implement the stopWorker logic for YARN*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink implement-stopWorker-yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4729






> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16193570#comment-16193570
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 closed the pull request at:

https://github.com/apache/flink/pull/4729


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16187836#comment-16187836
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r142100776
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   ContainerId containerId = 
containerIdMap.remove(resourceID.getResourceIdString());
+   if (containerId != null) {
+   log.info("Stopping container {}.", 
containerId.toString());
+   
resourceManagerClient.releaseAssignedContainer(containerId);
--- End diff --

Then we should change the `Worker` type to also include the Mesos 
`ContainerId` and pass it to the `stopWorker` method.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16187835#comment-16187835
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r142100679
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
--- End diff --

Yes exactly.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185544#comment-16185544
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r141822004
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
--- End diff --

Do you mean something like MesosWorkerStore.Worker in MesosResourceManager, 
which has both the taskId (used as resourceID) and other context info for the 
Mesos worker?


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16185532#comment-16185532
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r141820967
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   ContainerId containerId = 
containerIdMap.remove(resourceID.getResourceIdString());
+   if (containerId != null) {
+   log.info("Stopping container {}.", 
containerId.toString());
+   
resourceManagerClient.releaseAssignedContainer(containerId);
--- End diff --

The ContainerId for YARN can be only generated using 
ContainerId.newInstance, which requires the application attempt id and the 
container Id. The resourceID only contains the container Id information, so 
it's not sufficient to rebuild the YARN ContainerId object that the 
releaseAssignedContainer method takes.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182266#comment-16182266
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r141294374
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
--- End diff --

I actually think that we should refactor this method to take a parameter of 
generic type `WorkerType` which contains all the framework specific 
registration information. In the current `YarnResourceManager` implementation 
case, it would be `ResourceID` which is generated from the container id. Thus, 
we should be able to retrieve the `containerId` from it.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182267#comment-16182267
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4729#discussion_r141293920
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -228,7 +234,14 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
 
@Override
public void stopWorker(ResourceID resourceID) {
-   // TODO: Implement to stop the worker
+   ContainerId containerId = 
containerIdMap.remove(resourceID.getResourceIdString());
+   if (containerId != null) {
+   log.info("Stopping container {}.", 
containerId.toString());
+   
resourceManagerClient.releaseAssignedContainer(containerId);
--- End diff --

I think in the Yarn case, the `resourceID` is generated from the 
stringified version of the `containerId`. Thus, you should be able to retrieve 
it from the provided `resourceID`.


> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182016#comment-16182016
 ] 

ASF GitHub Bot commented on FLINK-7076:
---

GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/4729

[FLINK-7076] [ResourceManager] implement YARN stopWorker logic

## What is the purpose of the change

*Implement stopWorker logic for YarnResourceManager*


## Brief change log

  - *Added a ConcurrentHashMap to keep the ResourceID to Yarn ContainerId 
mappings*
  - *Implement the stopWorker logic for YARN*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink implement-stopWorker-yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4729


commit e82cb580956d3ee323e40f9c5335cf645b32b99d
Author: Shuyi Chen 
Date:   2017-09-27T05:23:34Z

implement YARN stopWorker logic




> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Shuyi Chen
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16175656#comment-16175656
 ] 

Till Rohrmann commented on FLINK-7076:
--

Hi [~yuemeng], are you still working on this?

> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: yuemeng
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-08-03 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112624#comment-16112624
 ] 

Till Rohrmann commented on FLINK-7076:
--

Hi [~yuemeng], let's first tackle the second problem which is how to release a 
container on the RM side. This basically means how to give it back to Yarn. I 
think the {{YarnResourceManager#stopWorker}} method should do this.

> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: yuemeng
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-08-02 Thread yuemeng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112117#comment-16112117
 ] 

yuemeng commented on FLINK-7076:


[~till.rohrmann]
To support dynamic scaling,for my understand. we need to know when the new 
container to be allocate from RM,and when to release container which marked as 
free.
this issue to solve how to define a free container(TM) and release it from RM.

> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: yuemeng
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)