Repository: incubator-twill Updated Branches: refs/heads/master d835cafc9 -> 0c20804c2
Potential race condition when restart all is called for a Twill runnable. If restart all instances is requested for a TwillRunnable then there could be race condition to check provisioned and container requests that could exit the TwillApplication. This PR containes changes: -) Change the container requests to be ConcurrentLinkedQueue since it is accessed by multiple threads. -) Add new volatile flag in RunnableContainerRequest to indicate whether it is ready to be provisioned. -) Move up adding container requests for restart before removing. -) Remove execution of restart to thread in the add instances executor. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/0c20804c Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/0c20804c Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/0c20804c Branch: refs/heads/master Commit: 0c20804c2001591dec590461a26d3b594601f4a9 Parents: d835caf Author: hsaputra <[email protected]> Authored: Wed Jul 29 16:34:12 2015 -0700 Committer: Terence Yim <[email protected]> Committed: Thu Jul 30 12:56:58 2015 -0700 ---------------------------------------------------------------------- .../appmaster/ApplicationMasterService.java | 82 ++++++++++++++------ .../appmaster/RunnableContainerRequest.java | 16 ++++ 2 files changed, 76 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/0c20804c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index cbf013b..818db05 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -93,6 +93,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -364,7 +365,15 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp // If nothing is in provisioning, and no pending request, move to next one while (provisioning.isEmpty() && currentRequest == null && !runnableContainerRequests.isEmpty()) { - currentRequest = runnableContainerRequests.peek().takeRequest(); + RunnableContainerRequest runnableContainerRequest = runnableContainerRequests.peek(); + if (!runnableContainerRequest.isReadyToBeProvisioned()) { + // take it out from queue and put it back at the end for second chance. + runnableContainerRequest = runnableContainerRequests.poll(); + runnableContainerRequests.add(runnableContainerRequest); + + continue; + } + currentRequest = runnableContainerRequest.takeRequest(); if (currentRequest == null) { // All different types of resource request from current order is done, move to next one // TODO: Need to handle order type as well @@ -524,7 +533,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp private Queue<RunnableContainerRequest> initContainerRequests() { // Orderly stores container requests. - Queue<RunnableContainerRequest> requests = Lists.newLinkedList(); + Queue<RunnableContainerRequest> requests = new ConcurrentLinkedQueue<>(); // For each order in the twillSpec, create container request for runnables, depending on Placement policy. for (TwillSpecification.Order order : twillSpec.getOrders()) { Set<String> distributedRunnables = placementPolicyManager.getDistributedRunnables(order.getNames()); @@ -752,6 +761,12 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp private RunnableContainerRequest createRunnableContainerRequest(final String runnableName, final int numberOfInstances) { + return createRunnableContainerRequest(runnableName, numberOfInstances, true); + } + + private RunnableContainerRequest createRunnableContainerRequest(final String runnableName, + final int numberOfInstances, + final boolean isProvisioned) { // Find the current order of the given runnable in order to create a RunnableContainerRequest. TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() { @Override @@ -775,7 +790,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp AllocationSpecification allocationSpecification = new AllocationSpecification(capability); addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec); } - return new RunnableContainerRequest(order.getType(), requestsMap); + return new RunnableContainerRequest(order.getType(), requestsMap, isProvisioned); } private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) { @@ -826,7 +841,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp // ... for a runnable ... String runnableName = message.getRunnableName(); LOG.debug("Start restarting all runnable {} instances.", runnableName); - restartRunnableInstances(runnableName, null); + restartRunnableInstances(runnableName, null, completion); } else { // ... or maybe some runnables for (Map.Entry<String, String> option : requestCommand.getOptions().entrySet()) { @@ -835,35 +850,58 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp new TypeToken<Set<Integer>>() {}.getType()); LOG.debug("Start restarting runnable {} instances {}", runnableName, restartedInstanceIds); - restartRunnableInstances(runnableName, restartedInstanceIds); + restartRunnableInstances(runnableName, restartedInstanceIds, completion); } } - completion.run(); return true; } /** * Helper method to restart instances of runnables. */ - private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) { - LOG.debug("Begin restart runnable {} instances.", runnableName); + private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds, + final Runnable completion) { + Runnable restartInstancesRunnable = createRestartInstancesRunnable(runnableName, instanceIds, completion); + instanceChangeExecutor.execute(restartInstancesRunnable); + } - Set<Integer> instancesToRemove = instanceIds; - if (instancesToRemove == null) { - instancesToRemove = Ranges.closedOpen(0, runningContainers.count(runnableName)).asSet(DiscreteDomains.integers()); - } + /** + * Creates a Runnable for execution of restart instances request. + */ + private Runnable createRestartInstancesRunnable(final String runnableName, @Nullable final Set<Integer> instanceIds, + final Runnable completion) { + return new Runnable() { + @Override + public void run() { + LOG.debug("Begin restart runnable {} instances.", runnableName); - for (int instanceId : instancesToRemove) { - LOG.debug("Remove instance {} for runnable {}", instanceId, runnableName); - try { - runningContainers.removeById(runnableName, instanceId); - } catch (Exception ex) { - // could be thrown if the container already stopped. - LOG.info("Exception thrown when stopping instance {} probably already stopped.", instanceId); + Set<Integer> instancesToRemove = instanceIds; + if (instancesToRemove == null) { + instancesToRemove = + Ranges.closedOpen(0, runningContainers.count(runnableName)).asSet(DiscreteDomains.integers()); + } + + LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName); + RunnableContainerRequest containerRequest = + createRunnableContainerRequest(runnableName, instancesToRemove.size(), false); + runnableContainerRequests.add(containerRequest); + + for (int instanceId : instancesToRemove) { + LOG.debug("Remove instance {} for runnable {}", instanceId, runnableName); + try { + runningContainers.removeById(runnableName, instanceId); + } catch (Exception ex) { + // could be thrown if the container already stopped. + LOG.info("Exception thrown when stopping instance {} probably already stopped.", instanceId); + } + } + + // set the container request to be ready + containerRequest.setReadyToBeProvisioned(true); + + completion.run(); } - } - LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName); - runnableContainerRequests.add(createRunnableContainerRequest(runnableName, instancesToRemove.size())); + }; } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/0c20804c/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java index 2105629..e001121 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java @@ -34,17 +34,33 @@ import java.util.Map; final class RunnableContainerRequest { private final TwillSpecification.Order.Type orderType; private final Iterator<Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>>> requests; + private volatile boolean isReadyToBeProvisioned; RunnableContainerRequest(TwillSpecification.Order.Type orderType, Map<AllocationSpecification, Collection<RuntimeSpecification>> requests) { + this(orderType, requests, true); + } + + RunnableContainerRequest(TwillSpecification.Order.Type orderType, + Map<AllocationSpecification, Collection<RuntimeSpecification>> requests, + boolean isReadyToBeProvisioned) { this.orderType = orderType; this.requests = requests.entrySet().iterator(); + this.isReadyToBeProvisioned = isReadyToBeProvisioned; } TwillSpecification.Order.Type getOrderType() { return orderType; } + public boolean isReadyToBeProvisioned() { + return isReadyToBeProvisioned; + } + + public void setReadyToBeProvisioned(boolean isProvisioned) { + this.isReadyToBeProvisioned = isProvisioned; + } + /** * Remove a resource request and return it. * @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or
