SLIDER-302 add a renewing action which executes then rechedules a nested action


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/8ac67486
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/8ac67486
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/8ac67486

Branch: refs/heads/feature/SLIDER-151_REST_API
Commit: 8ac67486f79a0e12a3047e9fadf440bc88c13d51
Parents: 323a80f
Author: Steve Loughran <ste...@apache.org>
Authored: Sat Aug 9 13:46:05 2014 +0100
Committer: Steve Loughran <ste...@apache.org>
Committed: Sat Aug 9 13:46:05 2014 +0100

----------------------------------------------------------------------
 .../server/appmaster/actions/ActionHalt.java    |   9 +-
 .../appmaster/actions/ActionStartContainer.java |   6 +-
 .../appmaster/actions/ActionStopQueue.java      |   9 +-
 .../appmaster/actions/ActionStopSlider.java     |   2 +-
 .../server/appmaster/actions/AsyncAction.java   |  55 ++++++++--
 .../actions/ProviderReportedContainerLoss.java  |   2 +-
 .../actions/ProviderStartupCompleted.java       |   2 +-
 .../server/appmaster/actions/QueueAccess.java   |   3 +
 .../server/appmaster/actions/QueueExecutor.java |   2 +-
 .../appmaster/actions/RenewingAction.java       | 105 +++++++++++++++++++
 .../server/appmaster/actions/TestActions.groovy |  37 ++++++-
 11 files changed, 208 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java
index 1468cd8..44337fc 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java
@@ -21,6 +21,10 @@ package org.apache.slider.server.appmaster.actions;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.slider.server.appmaster.SliderAppMaster;
 
+/**
+ * Exit a JVM halt.
+ * @see ExitUtil#halt(int, String) 
+ */
 public class ActionHalt extends AsyncAction {
 
   private final int status;
@@ -28,14 +32,15 @@ public class ActionHalt extends AsyncAction {
 
   public ActionHalt(
       int status,
-      String text, int delay) {
+      String text,
+      int delay) {
     super("Halt", delay, ActionAttributes.HALTS_CLUSTER);
     this.status = status;
     this.text = text;
   }
 
   @Override
-  public void execute(SliderAppMaster appMaster) throws Exception {
+  public void execute(SliderAppMaster appMaster, QueueAccess queueService) 
throws Exception {
     ExitUtil.halt(status, text);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java
index 6179f35..1a86e5c 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java
@@ -23,6 +23,10 @@ import 
org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.slider.server.appmaster.SliderAppMaster;
 import org.apache.slider.server.appmaster.state.RoleInstance;
 
+/**
+ * Start a container
+ * @see SliderAppMaster#startContainer(Container, ContainerLaunchContext, 
RoleInstance) 
+ */
 public class ActionStartContainer extends AsyncAction {
 
   private final Container container;
@@ -41,7 +45,7 @@ public class ActionStartContainer extends AsyncAction {
   }
 
   @Override
-  public void execute(SliderAppMaster appMaster) throws Exception {
+  public void execute(SliderAppMaster appMaster, QueueAccess queueService) 
throws Exception {
     appMaster.startContainer(container, ctx, instance);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
index a11bccb..7a31f12 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java
@@ -20,6 +20,8 @@ package org.apache.slider.server.appmaster.actions;
 
 import org.apache.slider.server.appmaster.SliderAppMaster;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Action to tell a queue executor to stop -after handing this on/executing it
  */
@@ -29,8 +31,13 @@ public class ActionStopQueue extends AsyncAction {
     super("stop queue", delay);
   }
 
+  public ActionStopQueue(int delay,
+      TimeUnit timeUnit) {
+    super("stop queue", delay, timeUnit);
+  }
+
   @Override
-  public void execute(SliderAppMaster appMaster) throws Exception {
+  public void execute(SliderAppMaster appMaster, QueueAccess queueService) 
throws Exception {
     // no-op
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java
index 0e6dc25..24cad1c 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java
@@ -28,7 +28,7 @@ public class ActionStopSlider extends AsyncAction {
   }
 
   @Override
-  public void execute(SliderAppMaster appMaster) throws Exception {
+  public void execute(SliderAppMaster appMaster, QueueAccess queueService) 
throws Exception {
     String message = name;
     SliderAppMaster.getLog().info("SliderAppMasterApi.stopCluster: {}",
         message);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
index 5e325a0..fed28e4 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java
@@ -33,7 +33,7 @@ public abstract class AsyncAction implements Delayed {
   private static final AtomicLong sequencer = new AtomicLong(0);
 
   public final String name;
-  public final long nanos;
+  private long nanos;
   private final EnumSet<ActionAttributes> attrs;
   private final long sequenceNumber = sequencer.incrementAndGet();
 
@@ -44,36 +44,55 @@ public abstract class AsyncAction implements Delayed {
 
   protected AsyncAction(String name,
       int delayMillis) {
+    this(name, delayMillis, TimeUnit.MILLISECONDS);
+  }
+
+  protected AsyncAction(String name,
+      int delay,
+      TimeUnit timeUnit) {
     this.name = name;
-    this.nanos = convertAndOffset(delayMillis);
+    this.setNanos(convertAndOffset(delay, timeUnit));
     attrs = EnumSet.noneOf(ActionAttributes.class);
   }
 
   protected AsyncAction(String name,
-      int delayMillis, EnumSet<ActionAttributes> attrs) {
+      int delay,
+      TimeUnit timeUnit,
+      EnumSet<ActionAttributes> attrs) {
     this.name = name;
-    this.nanos = convertAndOffset(delayMillis);
+    this.setNanos(convertAndOffset(delay, timeUnit));
     this.attrs = attrs;
   }
 
   protected AsyncAction(String name,
-      int delayMillis,
+      int delay,
+      TimeUnit timeUnit,
       ActionAttributes... attributes) {
-    this(name, delayMillis);
+    this(name, delay, timeUnit);
     Collections.addAll(attrs, attributes);
   }
+  
+  protected AsyncAction(String name,
+      int delayMillis,
+      ActionAttributes... attributes) {
+    this(name, delayMillis, TimeUnit.MILLISECONDS);
+  }
 
-  private long convertAndOffset(int delay) {
-    return now() + TimeUnit.NANOSECONDS.convert(delay, TimeUnit.MILLISECONDS);
+  protected long convertAndOffset(int delay, TimeUnit timeUnit) {
+    return now() + TimeUnit.NANOSECONDS.convert(delay, timeUnit);
   }
 
+  /**
+   * The current time in nanos
+   * @return now
+   */
   protected long now() {
     return System.nanoTime();
   }
 
   @Override
   public long getDelay(TimeUnit unit) {
-    return unit.convert(nanos - now(), TimeUnit.NANOSECONDS);
+    return unit.convert(getNanos() - now(), TimeUnit.NANOSECONDS);
   }
 
   @Override
@@ -91,13 +110,17 @@ public abstract class AsyncAction implements Delayed {
     final StringBuilder sb =
         new StringBuilder(super.toString());
     sb.append(" name='").append(name).append('\'');
-    sb.append(", nanos=").append(nanos);
+    sb.append(", nanos=").append(getNanos());
     sb.append(", attrs=").append(attrs);
     sb.append(", sequenceNumber=").append(sequenceNumber);
     sb.append('}');
     return sb.toString();
   }
 
+  protected EnumSet<ActionAttributes> getAttrs() {
+    return attrs;
+  }
+
   /**
    * Ask if an action has a specific attribute
    * @param attr attribute
@@ -110,9 +133,19 @@ public abstract class AsyncAction implements Delayed {
   /**
    * Actual application
    * @param appMaster
+   * @param queueService
    * @throws IOException
    */
-  public abstract void execute(SliderAppMaster appMaster) throws Exception;
+  public abstract void execute(SliderAppMaster appMaster,
+      QueueAccess queueService) throws Exception;
+
+  public long getNanos() {
+    return nanos;
+  }
+
+  public void setNanos(long nanos) {
+    this.nanos = nanos;
+  }
 
   public enum ActionAttributes {
     SHRINKS_CLUSTER,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java
index 63b40d5..6a2cc6b 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java
@@ -41,7 +41,7 @@ public class ProviderReportedContainerLoss extends 
AsyncAction {
   }
 
   @Override
-  public void execute(SliderAppMaster appMaster) throws Exception {
+  public void execute(SliderAppMaster appMaster, QueueAccess queueService) 
throws Exception {
     appMaster.providerLostContainer(containerId);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java
index 293492f..4e06f7a 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java
@@ -31,7 +31,7 @@ public class ProviderStartupCompleted extends AsyncAction {
   }
 
   @Override
-  public void execute(SliderAppMaster appMaster) throws Exception {
+  public void execute(SliderAppMaster appMaster, QueueAccess queueService) 
throws Exception {
     appMaster.eventCallbackEvent(null);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java
index f717085..160333d 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java
@@ -18,6 +18,9 @@
 
 package org.apache.slider.server.appmaster.actions;
 
+/**
+ * Access for queue operations
+ */
 public interface QueueAccess {
   /**
    * Put an action on the immediate queue -to be executed when the queue

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
index 072f324..bc2b260 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java
@@ -62,7 +62,7 @@ public class QueueExecutor implements Runnable {
       do {
         take = actionQueues.actionQueue.take();
         log.debug("Executing {}", take);
-        take.execute(appMaster);
+        take.execute(appMaster, actionQueues);
       } while (!(take instanceof ActionStopQueue));
       log.info("Queue Executor run() stopped");
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
new file mode 100644
index 0000000..30870bf
--- /dev/null
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.actions;
+
+import org.apache.slider.server.appmaster.SliderAppMaster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This action executes then reschedules an inner action; a limit
+ * can specify the number of times to run
+ */
+
+public class RenewingAction extends AsyncAction{
+  private static final Logger log =
+      LoggerFactory.getLogger(RenewingAction.class);
+  private final AsyncAction action;
+  private final int interval;
+  private final TimeUnit timeUnit;
+  public final AtomicInteger executionCount = new AtomicInteger();
+  public final int limit;
+
+
+  /**
+   * Rescheduling action
+   * @param action action to execute
+   * @param initialDelay initial delay
+   * @param interval interval for later delays
+   * @param timeUnit time unit for all times
+   * @param limit limit on the no. of executions. If 0 or less: no limit
+   */
+  public RenewingAction(AsyncAction action,
+      int initialDelay,
+      int interval, TimeUnit timeUnit,
+      int limit) {
+    super("renewing " + action.name, initialDelay, timeUnit, 
action.getAttrs());
+    this.action = action;
+    this.interval = interval;
+    this.timeUnit = timeUnit;
+    this.limit = limit;
+  }
+
+  /**
+   * Execute the inner action then reschedule ourselves
+   * @param appMaster
+   * @param queueService
+   * @throws Exception
+   */
+  @Override
+  public void execute(SliderAppMaster appMaster, QueueAccess queueService)
+      throws Exception {
+    long exCount = executionCount.incrementAndGet();
+    log.debug("{}: Executing inner action count # {}", this, exCount);
+    action.execute(appMaster, queueService);
+    boolean reschedule = true;
+    if (limit > 0) {
+      reschedule = limit > exCount;
+    }
+    if (reschedule) {
+      this.setNanos(convertAndOffset(interval, timeUnit));
+      log.debug("{}: rescheduling, new offset {} mS ", this,
+          getDelay(TimeUnit.MILLISECONDS));
+      queueService.putDelayed(this);
+    }
+  }
+
+  public AsyncAction getAction() {
+    return action;
+  }
+
+  public int getInterval() {
+    return interval;
+  }
+
+  public TimeUnit getTimeUnit() {
+    return timeUnit;
+  }
+
+  public int getExecutionCount() {
+    return executionCount.get();
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8ac67486/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy
----------------------------------------------------------------------
diff --git 
a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy
 
b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy
index 5dffea0..1e22acd 100644
--- 
a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy
+++ 
b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/actions/TestActions.groovy
@@ -139,27 +139,52 @@ class TestActions {
     queues.putDelayed(note2)
     queues.putDelayed(stop)
     // async to sync expected to run in order
+    runQueuesToCompletion()
+    assert note1.executed.get()
+    assert note2.executed.get()
+  }
+
+  public void runQueuesToCompletion() {
     queues.run();
     assert queues.delayedActions.empty
     assert !queues.actionQueue.empty
     QueueExecutor ex = new QueueExecutor(queues)
     ex.run();
     assert queues.actionQueue.empty
-    assert note1.executed.get()
-    assert note2.executed.get()
+  }
+
+  @Test
+  public void testRenewedActionFiresOnceAtLeast() throws Throwable {
+    ActionNoteExecuted note1 = new ActionNoteExecuted("note1", 500)
+    RenewingAction renewer = new RenewingAction(
+        note1,
+        500,
+        100,
+        TimeUnit.MILLISECONDS,
+        3)
+    queues.putDelayed(renewer);
+    def stop = new ActionStopQueue(4, TimeUnit.SECONDS)
+    queues.putDelayed(stop);
+    // this runs all the delayed actions FIRST, so can't be used
+    // to play tricks of renewing actions ahead of the stop action
+    runQueuesToCompletion()
+    assert renewer.executionCount == 1
+    assert note1.executionCount == 1
+    // assert the renewed item is back in
+    assert queues.delayedActions.contains(renewer)
   }
 
   public class ActionNoteExecuted extends AsyncAction {
     public final AtomicBoolean executed = new AtomicBoolean(false);
     public final AtomicLong executionTimeNanos = new AtomicLong()
-    public final AtomicLong executionCount = new AtomicLong()
+    private final AtomicLong executionCount = new AtomicLong()
 
     public ActionNoteExecuted(String text, int delay) {
       super(text, delay);
     }
 
     @Override
-    public void execute(SliderAppMaster appMaster) throws Exception {
+    public void execute(SliderAppMaster appMaster, QueueAccess queueService) 
throws Exception {
       log.info("Executing $name");
       executed.set(true);
       executionTimeNanos.set(System.nanoTime())
@@ -175,6 +200,8 @@ class TestActions {
              " executed=${executed.get()}; count=${executionCount.get()};"
     }
 
-
+    long getExecutionCount() {
+      return executionCount.get()
+    }
   }
 }

Reply via email to