SLIDER-302 add a renewing action which executes then rechedules a nested action
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/8ac67486 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/8ac67486 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/8ac67486 Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry Commit: 8ac67486f79a0e12a3047e9fadf440bc88c13d51 Parents: 323a80f Author: Steve Loughran <ste...@apache.org> Authored: Sat Aug 9 13:46:05 2014 +0100 Committer: Steve Loughran <ste...@apache.org> Committed: Sat Aug 9 13:46:05 2014 +0100 ---------------------------------------------------------------------- .../server/appmaster/actions/ActionHalt.java | 9 +- .../appmaster/actions/ActionStartContainer.java | 6 +- .../appmaster/actions/ActionStopQueue.java | 9 +- .../appmaster/actions/ActionStopSlider.java | 2 +- .../server/appmaster/actions/AsyncAction.java | 55 ++++++++-- .../actions/ProviderReportedContainerLoss.java | 2 +- .../actions/ProviderStartupCompleted.java | 2 +- .../server/appmaster/actions/QueueAccess.java | 3 + .../server/appmaster/actions/QueueExecutor.java | 2 +- .../appmaster/actions/RenewingAction.java | 105 +++++++++++++++++++ .../server/appmaster/actions/TestActions.groovy | 37 ++++++- 11 files changed, 208 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java index 1468cd8..44337fc 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java @@ -21,6 +21,10 @@ package org.apache.slider.server.appmaster.actions; import org.apache.hadoop.util.ExitUtil; import org.apache.slider.server.appmaster.SliderAppMaster; +/** + * Exit a JVM halt. + * @see ExitUtil#halt(int, String) + */ public class ActionHalt extends AsyncAction { private final int status; @@ -28,14 +32,15 @@ public class ActionHalt extends AsyncAction { public ActionHalt( int status, - String text, int delay) { + String text, + int delay) { super("Halt", delay, ActionAttributes.HALTS_CLUSTER); this.status = status; this.text = text; } @Override - public void execute(SliderAppMaster appMaster) throws Exception { + public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { ExitUtil.halt(status, text); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java index 6179f35..1a86e5c 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java @@ -23,6 +23,10 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.slider.server.appmaster.SliderAppMaster; import org.apache.slider.server.appmaster.state.RoleInstance; +/** + * Start a container + * @see SliderAppMaster#startContainer(Container, ContainerLaunchContext, RoleInstance) + */ public class ActionStartContainer extends AsyncAction { private final Container container; @@ -41,7 +45,7 @@ public class ActionStartContainer extends AsyncAction { } @Override - public void execute(SliderAppMaster appMaster) throws Exception { + public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { appMaster.startContainer(container, ctx, instance); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java index a11bccb..7a31f12 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java @@ -20,6 +20,8 @@ package org.apache.slider.server.appmaster.actions; import org.apache.slider.server.appmaster.SliderAppMaster; +import java.util.concurrent.TimeUnit; + /** * Action to tell a queue executor to stop -after handing this on/executing it */ @@ -29,8 +31,13 @@ public class ActionStopQueue extends AsyncAction { super("stop queue", delay); } + public ActionStopQueue(int delay, + TimeUnit timeUnit) { + super("stop queue", delay, timeUnit); + } + @Override - public void execute(SliderAppMaster appMaster) throws Exception { + public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { // no-op } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java index 0e6dc25..24cad1c 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java @@ -28,7 +28,7 @@ public class ActionStopSlider extends AsyncAction { } @Override - public void execute(SliderAppMaster appMaster) throws Exception { + public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { String message = name; SliderAppMaster.getLog().info("SliderAppMasterApi.stopCluster: {}", message); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java index 5e325a0..fed28e4 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java @@ -33,7 +33,7 @@ public abstract class AsyncAction implements Delayed { private static final AtomicLong sequencer = new AtomicLong(0); public final String name; - public final long nanos; + private long nanos; private final EnumSet<ActionAttributes> attrs; private final long sequenceNumber = sequencer.incrementAndGet(); @@ -44,36 +44,55 @@ public abstract class AsyncAction implements Delayed { protected AsyncAction(String name, int delayMillis) { + this(name, delayMillis, TimeUnit.MILLISECONDS); + } + + protected AsyncAction(String name, + int delay, + TimeUnit timeUnit) { this.name = name; - this.nanos = convertAndOffset(delayMillis); + this.setNanos(convertAndOffset(delay, timeUnit)); attrs = EnumSet.noneOf(ActionAttributes.class); } protected AsyncAction(String name, - int delayMillis, EnumSet<ActionAttributes> attrs) { + int delay, + TimeUnit timeUnit, + EnumSet<ActionAttributes> attrs) { this.name = name; - this.nanos = convertAndOffset(delayMillis); + this.setNanos(convertAndOffset(delay, timeUnit)); this.attrs = attrs; } protected AsyncAction(String name, - int delayMillis, + int delay, + TimeUnit timeUnit, ActionAttributes... attributes) { - this(name, delayMillis); + this(name, delay, timeUnit); Collections.addAll(attrs, attributes); } + + protected AsyncAction(String name, + int delayMillis, + ActionAttributes... attributes) { + this(name, delayMillis, TimeUnit.MILLISECONDS); + } - private long convertAndOffset(int delay) { - return now() + TimeUnit.NANOSECONDS.convert(delay, TimeUnit.MILLISECONDS); + protected long convertAndOffset(int delay, TimeUnit timeUnit) { + return now() + TimeUnit.NANOSECONDS.convert(delay, timeUnit); } + /** + * The current time in nanos + * @return now + */ protected long now() { return System.nanoTime(); } @Override public long getDelay(TimeUnit unit) { - return unit.convert(nanos - now(), TimeUnit.NANOSECONDS); + return unit.convert(getNanos() - now(), TimeUnit.NANOSECONDS); } @Override @@ -91,13 +110,17 @@ public abstract class AsyncAction implements Delayed { final StringBuilder sb = new StringBuilder(super.toString()); sb.append(" name='").append(name).append('\''); - sb.append(", nanos=").append(nanos); + sb.append(", nanos=").append(getNanos()); sb.append(", attrs=").append(attrs); sb.append(", sequenceNumber=").append(sequenceNumber); sb.append('}'); return sb.toString(); } + protected EnumSet<ActionAttributes> getAttrs() { + return attrs; + } + /** * Ask if an action has a specific attribute * @param attr attribute @@ -110,9 +133,19 @@ public abstract class AsyncAction implements Delayed { /** * Actual application * @param appMaster + * @param queueService * @throws IOException */ - public abstract void execute(SliderAppMaster appMaster) throws Exception; + public abstract void execute(SliderAppMaster appMaster, + QueueAccess queueService) throws Exception; + + public long getNanos() { + return nanos; + } + + public void setNanos(long nanos) { + this.nanos = nanos; + } public enum ActionAttributes { SHRINKS_CLUSTER, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java index 63b40d5..6a2cc6b 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java @@ -41,7 +41,7 @@ public class ProviderReportedContainerLoss extends AsyncAction { } @Override - public void execute(SliderAppMaster appMaster) throws Exception { + public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { appMaster.providerLostContainer(containerId); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java index 293492f..4e06f7a 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java @@ -31,7 +31,7 @@ public class ProviderStartupCompleted extends AsyncAction { } @Override - public void execute(SliderAppMaster appMaster) throws Exception { + public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { appMaster.eventCallbackEvent(null); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java index f717085..160333d 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java @@ -18,6 +18,9 @@ package org.apache.slider.server.appmaster.actions; +/** + * Access for queue operations + */ public interface QueueAccess { /** * Put an action on the immediate queue -to be executed when the queue http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java index 072f324..bc2b260 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java @@ -62,7 +62,7 @@ public class QueueExecutor implements Runnable { do { take = actionQueues.actionQueue.take(); log.debug("Executing {}", take); - take.execute(appMaster); + take.execute(appMaster, actionQueues); } while (!(take instanceof ActionStopQueue)); log.info("Queue Executor run() stopped"); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java new file mode 100644 index 0000000..30870bf --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This action executes then reschedules an inner action; a limit + * can specify the number of times to run + */ + +public class RenewingAction extends AsyncAction{ + private static final Logger log = + LoggerFactory.getLogger(RenewingAction.class); + private final AsyncAction action; + private final int interval; + private final TimeUnit timeUnit; + public final AtomicInteger executionCount = new AtomicInteger(); + public final int limit; + + + /** + * Rescheduling action + * @param action action to execute + * @param initialDelay initial delay + * @param interval interval for later delays + * @param timeUnit time unit for all times + * @param limit limit on the no. of executions. If 0 or less: no limit + */ + public RenewingAction(AsyncAction action, + int initialDelay, + int interval, TimeUnit timeUnit, + int limit) { + super("renewing " + action.name, initialDelay, timeUnit, action.getAttrs()); + this.action = action; + this.interval = interval; + this.timeUnit = timeUnit; + this.limit = limit; + } + + /** + * Execute the inner action then reschedule ourselves + * @param appMaster + * @param queueService + * @throws Exception + */ + @Override + public void execute(SliderAppMaster appMaster, QueueAccess queueService) + throws Exception { + long exCount = executionCount.incrementAndGet(); + log.debug("{}: Executing inner action count # {}", this, exCount); + action.execute(appMaster, queueService); + boolean reschedule = true; + if (limit > 0) { + reschedule = limit > exCount; + } + if (reschedule) { + this.setNanos(convertAndOffset(interval, timeUnit)); + log.debug("{}: rescheduling, new offset {} mS ", this, + getDelay(TimeUnit.MILLISECONDS)); + queueService.putDelayed(this); + } + } + + public AsyncAction getAction() { + return action; + } + + public int getInterval() { + return interval; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + public int getExecutionCount() { + return executionCount.get(); + } + + public int getLimit() { + return limit; + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy index 5dffea0..1e22acd 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy @@ -139,27 +139,52 @@ class TestActions { queues.putDelayed(note2) queues.putDelayed(stop) // async to sync expected to run in order + runQueuesToCompletion() + assert note1.executed.get() + assert note2.executed.get() + } + + public void runQueuesToCompletion() { queues.run(); assert queues.delayedActions.empty assert !queues.actionQueue.empty QueueExecutor ex = new QueueExecutor(queues) ex.run(); assert queues.actionQueue.empty - assert note1.executed.get() - assert note2.executed.get() + } + + @Test + public void testRenewedActionFiresOnceAtLeast() throws Throwable { + ActionNoteExecuted note1 = new ActionNoteExecuted("note1", 500) + RenewingAction renewer = new RenewingAction( + note1, + 500, + 100, + TimeUnit.MILLISECONDS, + 3) + queues.putDelayed(renewer); + def stop = new ActionStopQueue(4, TimeUnit.SECONDS) + queues.putDelayed(stop); + // this runs all the delayed actions FIRST, so can't be used + // to play tricks of renewing actions ahead of the stop action + runQueuesToCompletion() + assert renewer.executionCount == 1 + assert note1.executionCount == 1 + // assert the renewed item is back in + assert queues.delayedActions.contains(renewer) } public class ActionNoteExecuted extends AsyncAction { public final AtomicBoolean executed = new AtomicBoolean(false); public final AtomicLong executionTimeNanos = new AtomicLong() - public final AtomicLong executionCount = new AtomicLong() + private final AtomicLong executionCount = new AtomicLong() public ActionNoteExecuted(String text, int delay) { super(text, delay); } @Override - public void execute(SliderAppMaster appMaster) throws Exception { + public void execute(SliderAppMaster appMaster, QueueAccess queueService) throws Exception { log.info("Executing $name"); executed.set(true); executionTimeNanos.set(System.nanoTime()) @@ -175,6 +200,8 @@ class TestActions { " executed=${executed.get()}; count=${executionCount.get()};" } - + long getExecutionCount() { + return executionCount.get() + } } }