Updated Branches: refs/heads/helix-provisioning 180aafe5b -> 27f627265
Provisioner returns ListenableFutures Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/27f62726 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/27f62726 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/27f62726 Branch: refs/heads/helix-provisioning Commit: 27f627265713bbb93535e8d5ff0aee57c7ae46e9 Parents: 180aafe Author: Kanak Biscuitwala <[email protected]> Authored: Thu Jan 9 18:02:36 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Thu Jan 9 18:02:36 2014 -0800 ---------------------------------------------------------------------- .../provisioner/ContainerProvider.java | 10 +- .../controller/provisioner/ContainerState.java | 4 +- .../stages/ContainerProvisioningStage.java | 150 +++++++++++++------ .../integration/TestLocalContainerProvider.java | 28 +++- .../yarn/GenericApplicationMaster.java | 15 +- .../provisioning/yarn/YarnProvisioner.java | 79 +++++----- 6 files changed, 181 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java index 2dee697..c88733f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java @@ -1,5 +1,7 @@ package org.apache.helix.controller.provisioner; +import com.google.common.util.concurrent.ListenableFuture; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,13 +23,13 @@ package org.apache.helix.controller.provisioner; public interface ContainerProvider { - ContainerId allocateContainer(ContainerSpec spec); + ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec); - boolean deallocateContainer(ContainerId containerId); + ListenableFuture<Boolean> deallocateContainer(ContainerId containerId); - boolean startContainer(ContainerId containerId); + ListenableFuture<Boolean> startContainer(ContainerId containerId); - boolean stopContainer(ContainerId containerId); + ListenableFuture<Boolean> stopContainer(ContainerId containerId); ContainerState getContainerState(ContainerId containerId); } http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java index c2e5649..cf4b736 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java @@ -25,8 +25,8 @@ public enum ContainerState { CONNECTING, ACTIVE, TEARDOWN, - FAILED, HALTED, FINALIZING, - FINALIZED + FINALIZED, + FAILED } http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java index c4cc5d8..97b80b9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java @@ -44,6 +44,10 @@ import org.apache.helix.controller.provisioner.TargetProviderResponse; import org.apache.helix.model.InstanceConfig; import org.apache.log4j.Logger; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + /** * This stage will manager the container allocation/deallocation needed for a * specific resource.<br/> @@ -61,12 +65,12 @@ public class ContainerProvisioningStage extends AbstractBaseStage { @Override public void process(ClusterEvent event) throws Exception { - HelixManager helixManager = event.getAttribute("helixmanager"); - Map<ResourceId, ResourceConfig> resourceMap = + final HelixManager helixManager = event.getAttribute("helixmanager"); + final Map<ResourceId, ResourceConfig> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString()); - HelixAdmin helixAdmin = helixManager.getClusterManagmentTool(); - HelixDataAccessor accessor = helixManager.getHelixDataAccessor(); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + final HelixAdmin helixAdmin = helixManager.getClusterManagmentTool(); + final HelixDataAccessor accessor = helixManager.getHelixDataAccessor(); + final PropertyKey.Builder keyBuilder = accessor.keyBuilder(); for (ResourceId resourceId : resourceMap.keySet()) { ResourceConfig resourceConfig = resourceMap.get(resourceId); ProvisionerConfig provisionerConfig = resourceConfig.getProvisionerConfig(); @@ -89,8 +93,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage { } } - Cluster cluster = event.getAttribute("ClusterDataCache"); - Collection<Participant> participants = cluster.getParticipantMap().values(); + final Cluster cluster = event.getAttribute("ClusterDataCache"); + final Collection<Participant> participants = cluster.getParticipantMap().values(); // Participants registered in helix // Give those participants to targetprovider @@ -103,13 +107,13 @@ public class ContainerProvisioningStage extends AbstractBaseStage { // TargetProvider should be stateless, given the state of cluster and existing participants // it should return the same result - TargetProviderResponse response = + final TargetProviderResponse response = provisioner.evaluateExistingContainers(cluster, resourceId, participants); // allocate new containers - for (ContainerSpec spec : response.getContainersToAcquire()) { + for (final ContainerSpec spec : response.getContainersToAcquire()) { // random participant id - ParticipantId participantId = ParticipantId.from(UUID.randomUUID().toString()); + final ParticipantId participantId = ParticipantId.from(UUID.randomUUID().toString()); // create a new Participant, attach the container spec InstanceConfig instanceConfig = new InstanceConfig(participantId); instanceConfig.setContainerSpec(spec); @@ -117,74 +121,128 @@ public class ContainerProvisioningStage extends AbstractBaseStage { instanceConfig.setContainerState(ContainerState.ACQUIRING); // create the helix participant and add it to cluster helixAdmin.addInstance(cluster.getId().toString(), instanceConfig); - ContainerId containerId = provisioner.allocateContainer(spec); - InstanceConfig existingInstance = - helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString()); - existingInstance.setContainerId(containerId); - existingInstance.setContainerState(ContainerState.ACQUIRED); - accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()), - existingInstance); + + ListenableFuture<ContainerId> future = provisioner.allocateContainer(spec); + Futures.addCallback(future, new FutureCallback<ContainerId>() { + @Override + public void onSuccess(ContainerId containerId) { + InstanceConfig existingInstance = + helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString()); + existingInstance.setContainerId(containerId); + existingInstance.setContainerState(ContainerState.ACQUIRED); + accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()), + existingInstance); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Could not allocate a container for participant " + participantId, t); + updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId, + ContainerState.FAILED); + } + }); } // start new containers - for (Participant participant : response.getContainersToStart()) { - InstanceConfig existingInstance = + for (final Participant participant : response.getContainersToStart()) { + final InstanceConfig existingInstance = helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId() .toString()); - ContainerId containerId = existingInstance.getContainerId(); + final ContainerId containerId = existingInstance.getContainerId(); existingInstance.setContainerId(containerId); existingInstance.setContainerState(ContainerState.CONNECTING); accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // create the helix participant and add it to cluster - provisioner.startContainer(containerId); - existingInstance = - helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId() - .toString()); - existingInstance.setContainerState(ContainerState.ACTIVE); - accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()), - existingInstance); + ListenableFuture<Boolean> future = provisioner.startContainer(containerId); + Futures.addCallback(future, new FutureCallback<Boolean>() { + @Override + public void onSuccess(Boolean result) { + updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), + ContainerState.ACTIVE); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Could not start container" + containerId + "for participant " + + participant.getId(), t); + updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), + ContainerState.FAILED); + } + }); } // release containers - for (Participant participant : response.getContainersToRelease()) { - // this will change the container state - InstanceConfig existingInstance = + for (final Participant participant : response.getContainersToRelease()) { + // mark it as finalizing + final InstanceConfig existingInstance = helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId() .toString()); - ContainerId containerId = existingInstance.getContainerId(); + final ContainerId containerId = existingInstance.getContainerId(); existingInstance.setContainerState(ContainerState.FINALIZING); accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); - provisioner.deallocateContainer(containerId); // remove the participant - existingInstance = - helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId() - .toString()); - helixAdmin.dropInstance(cluster.getId().toString(), existingInstance); + ListenableFuture<Boolean> future = provisioner.deallocateContainer(containerId); + Futures.addCallback(future, new FutureCallback<Boolean>() { + @Override + public void onSuccess(Boolean result) { + InstanceConfig existingInstance = + helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId() + .toString()); + helixAdmin.dropInstance(cluster.getId().toString(), existingInstance); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Could not deallocate container" + containerId + "for participant " + + participant.getId(), t); + updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), + ContainerState.FAILED); + } + }); } // stop but don't remove - for (Participant participant : response.getContainersToStop()) { + for (final Participant participant : response.getContainersToStop()) { // disable the node first - InstanceConfig existingInstance = + final InstanceConfig existingInstance = helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId() .toString()); - ContainerId containerId = existingInstance.getContainerId(); + final ContainerId containerId = existingInstance.getContainerId(); existingInstance.setInstanceEnabled(false); existingInstance.setContainerState(ContainerState.TEARDOWN); accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // stop the container - provisioner.stopContainer(containerId); - existingInstance = - helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId() - .toString()); - existingInstance.setContainerState(ContainerState.HALTED); - accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()), - existingInstance); + ListenableFuture<Boolean> future = provisioner.stopContainer(containerId); + Futures.addCallback(future, new FutureCallback<Boolean>() { + @Override + public void onSuccess(Boolean result) { + updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), + ContainerState.HALTED); + } + + @Override + public void onFailure(Throwable t) { + LOG.error( + "Could not stop container" + containerId + "for participant " + + participant.getId(), t); + updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), + ContainerState.FAILED); + } + }); } } } } + + private void updateContainerState(HelixAdmin helixAdmin, HelixDataAccessor accessor, + PropertyKey.Builder keyBuilder, Cluster cluster, ParticipantId participantId, + ContainerState state) { + InstanceConfig existingInstance = + helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString()); + existingInstance.setContainerState(state); + accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java index 6439fed..7b8a580 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java @@ -62,6 +62,8 @@ import org.testng.annotations.Test; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; public class TestLocalContainerProvider extends ZkUnitTestBase { private static final int MAX_PARTICIPANTS = 10; @@ -219,24 +221,28 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { } @Override - public ContainerId allocateContainer(ContainerSpec spec) { + public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) { // allocation is a no-op ContainerId containerId = spec.getContainerId(); _states.put(containerId, ContainerState.ACQUIRED); allocated++; - return containerId; + SettableFuture<ContainerId> future = SettableFuture.create(); + future.set(containerId); + return future; } @Override - public boolean deallocateContainer(ContainerId containerId) { + public ListenableFuture<Boolean> deallocateContainer(ContainerId containerId) { // deallocation is a no-op _states.put(containerId, ContainerState.FINALIZED); deallocated++; - return true; + SettableFuture<Boolean> future = SettableFuture.create(); + future.set(true); + return future; } @Override - public boolean startContainer(ContainerId containerId) { + public ListenableFuture<Boolean> startContainer(ContainerId containerId) { ParticipantService participant = new ParticipantService(_clusterId, _containerParticipants.get(containerId)); participant.startAsync(); @@ -244,17 +250,21 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { _participants.put(containerId, participant); _states.put(containerId, ContainerState.ACTIVE); started++; - return true; + SettableFuture<Boolean> future = SettableFuture.create(); + future.set(true); + return future; } @Override - public boolean stopContainer(ContainerId containerId) { + public ListenableFuture<Boolean> stopContainer(ContainerId containerId) { ParticipantService participant = _participants.get(containerId); participant.stopAsync(); participant.awaitTerminated(); _states.put(containerId, ContainerState.HALTED); stopped++; - return true; + SettableFuture<Boolean> future = SettableFuture.create(); + future.set(true); + return future; } @Override @@ -298,6 +308,8 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { // halted containers can be released containersToRelease.add(participant); break; + default: + break; } ContainerId containerId = containerConfig.getId(); if (containerId != null) { http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java index d3f410f..3b4e937 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java @@ -30,10 +30,6 @@ import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,7 +46,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -61,9 +56,9 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; /** @@ -237,26 +232,26 @@ public class GenericApplicationMaster { } - public Future<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) { + public ListenableFuture<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) { amRMClient.addContainerRequest(containerAsk); numRequestedContainers.incrementAndGet(); SettableFuture<ContainerAskResponse> future = SettableFuture.create(); return future; } - public Future<ContainerStopResponse> stopContainer(Container container) { + public ListenableFuture<ContainerStopResponse> stopContainer(Container container) { nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId()); SettableFuture<ContainerStopResponse> future = SettableFuture.create(); return future; } - public Future<ContainerReleaseResponse> releaseContainer(Container container) { + public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container container) { amRMClient.releaseAssignedContainer(container.getId()); SettableFuture<ContainerReleaseResponse> future = SettableFuture.create(); return future; } - public Future<ContainerLaunchResponse> launchContainer(Container container, + public ListenableFuture<ContainerLaunchResponse> launchContainer(Container container, ContainerLaunchContext containerLaunchContext) { nmClientAsync.startContainerAsync(container, containerLaunchContext); SettableFuture<ContainerLaunchResponse> future = SettableFuture.create(); http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java index bfaa209..e921c87 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java @@ -3,8 +3,7 @@ package org.apache.helix.provisioning.yarn; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -24,60 +23,70 @@ import org.apache.helix.controller.provisioner.ContainerState; import org.apache.helix.controller.provisioner.Provisioner; import org.apache.helix.controller.provisioner.TargetProviderResponse; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + public class YarnProvisioner implements Provisioner { private static final Log LOG = LogFactory.getLog(YarnProvisioner.class); static GenericApplicationMaster applicationMaster; + static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>(); @Override - public ContainerId allocateContainer(ContainerSpec spec) { + public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) { ContainerRequest containerAsk = setupContainerAskForRM(spec); - Future<ContainerAskResponse> requestNewContainer = + ListenableFuture<ContainerAskResponse> requestNewContainer = applicationMaster.acquireContainer(containerAsk); - ContainerAskResponse containerAskResponse; - try { - containerAskResponse = requestNewContainer.get(); - ContainerId helixContainerId = - ContainerId.from(containerAskResponse.getContainer().getId().toString()); - allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer()); - return helixContainerId; - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } - return null; + return Futures.transform(requestNewContainer, new Function<ContainerAskResponse, ContainerId>() { + @Override + public ContainerId apply(ContainerAskResponse containerAskResponse) { + ContainerId helixContainerId = + ContainerId.from(containerAskResponse.getContainer().getId().toString()); + allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer()); + return helixContainerId; + } + }); } @Override - public boolean deallocateContainer(ContainerId containerId) { - Future<ContainerReleaseResponse> releaseContainer = + public ListenableFuture<Boolean> deallocateContainer(ContainerId containerId) { + ListenableFuture<ContainerReleaseResponse> releaseContainer = applicationMaster.releaseContainer(allocatedContainersMap.get(containerId)); - try { - releaseContainer.get(); - return true; - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (ExecutionException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return false; + return Futures.transform(releaseContainer, new Function<ContainerReleaseResponse, Boolean>() { + @Override + public Boolean apply(ContainerReleaseResponse response) { + return response != null; + } + }, service); } @Override - public boolean startContainer(ContainerId containerId) { + public ListenableFuture<Boolean> startContainer(final ContainerId containerId) { Container container = allocatedContainersMap.get(containerId); ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class); - applicationMaster.launchContainer(container, containerLaunchContext); - return false; + ListenableFuture<ContainerLaunchResponse> future = applicationMaster.launchContainer(container, containerLaunchContext); + return Futures.transform(future, new Function<ContainerLaunchResponse, Boolean>() { + @Override + public Boolean apply(ContainerLaunchResponse response) { + return response != null; + } + }, service); } @Override - public boolean stopContainer(ContainerId containerId) { - return false; + public ListenableFuture<Boolean> stopContainer(final ContainerId containerId) { + Container container = allocatedContainersMap.get(containerId); + ListenableFuture<ContainerStopResponse> future = applicationMaster.stopContainer(container); + return Futures.transform(future, new Function<ContainerStopResponse, Boolean>() { + @Override + public Boolean apply(ContainerStopResponse response) { + return response != null; + } + }, service); } @Override
