Repository: aurora
Updated Branches:
  refs/heads/master 9ea897978 -> 4ab4b2b2c


Add best effort pulse timestamp recovery.

Currently the scheduler causes all coordinated ("pulsed") updates into
ROLL_FORWARD_AWAITING_PULSE, or ROLL_BACK_AWAITING_PULSE on scheduler
startup/recovery. This is because the last pulse timestamp is not durably stored
and the timestamp of the last pulse is set to 0L (aka no pulse yet).

In cases where the pulse timeout is larger and the failover is fast or frequent,
this casues many updates to unnecessarily transition into a pulse related state
until the next pulse.

It is posible to avoid these uncessary transitons by traversing the job update
events and initializing the last pulse timestamp to the last event if the last
event was not a pulse event.

Bugs closed: AURORA-1890

Reviewed at https://reviews.apache.org/r/56723/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4ab4b2b2
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4ab4b2b2
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4ab4b2b2

Branch: refs/heads/master
Commit: 4ab4b2b2c4ce7c6cc4e79aacde0cb54275f82b67
Parents: 9ea8979
Author: Zameer Manji <zma...@apache.org>
Authored: Thu Feb 16 12:08:34 2017 -0800
Committer: Zameer Manji <zma...@apache.org>
Committed: Thu Feb 16 12:08:34 2017 -0800

----------------------------------------------------------------------
 .../thrift/org/apache/aurora/gen/api.thrift     |  3 +
 .../org/apache/aurora/scheduler/base/Jobs.java  | 10 +++
 .../updater/JobUpdateControllerImpl.java        | 36 +++++++++--
 .../aurora/scheduler/updater/JobUpdaterIT.java  | 64 +++++++++++++++++++-
 4 files changed, 108 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift 
b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index efd4e53..3749531 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -619,6 +619,9 @@ const set<JobUpdateStatus> ACTIVE_JOB_UPDATE_STATES = 
[JobUpdateStatus.ROLLING_F
                                                        
JobUpdateStatus.ROLL_BACK_PAUSED,
                                                        
JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE,
                                                        
JobUpdateStatus.ROLL_BACK_AWAITING_PULSE]
+/** States the job update can be in while waiting for a pulse. */
+const set<JobUpdateStatus> AWAITNG_PULSE_JOB_UPDATE_STATES = 
[JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE,
+                                                              
JobUpdateStatus.ROLL_BACK_AWAITING_PULSE]
 
 /** Job update actions that can be applied to job instances. */
 enum JobUpdateAction {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java 
b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
index 49e5b2c..8d5f4e5 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
@@ -13,8 +13,12 @@
  */
 package org.apache.aurora.scheduler.base;
 
+import java.util.EnumSet;
+
 import org.apache.aurora.gen.JobStats;
+import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.storage.entities.IJobStats;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
@@ -28,6 +32,12 @@ public final class Jobs {
   }
 
   /**
+   * States of updates that are blocked on pulses.
+   */
+  public static final EnumSet<JobUpdateStatus> AWAITING_PULSE_STATES =
+      EnumSet.copyOf(apiConstants.AWAITNG_PULSE_JOB_UPDATE_STATES);
+
+  /**
    * For a given collection of tasks compute statistics based on the state of 
the task.
    *
    * @param tasks a collection of tasks for which statistics are sought

http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
 
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 729c123..e141124 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.updater;
 
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,6 +29,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
 import com.google.inject.Inject;
 
 import org.apache.aurora.common.application.Lifecycle;
@@ -84,6 +86,7 @@ import static 
org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
 import static 
org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
 import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError;
+import static org.apache.aurora.scheduler.base.Jobs.AWAITING_PULSE_STATES;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static 
org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
 import static 
org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES;
@@ -202,7 +205,7 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
       JobUpdateStatus status = ROLLING_FORWARD;
       if (isCoordinatedUpdate(instructions)) {
         status = ROLL_FORWARD_AWAITING_PULSE;
-        pulseHandler.initializePulseState(update, status);
+        pulseHandler.initializePulseState(update, status, 0L);
       }
 
       recordAndChangeJobUpdateStatus(
@@ -271,6 +274,29 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
     return status -> addAuditData(newEvent(status), auditData);
   }
 
+  private static final Ordering<IJobUpdateEvent> CHRON_ORDERING =
+      Ordering.from(Comparator.comparingLong(IJobUpdateEvent::getTimestampMs));
+
+  private long inferLastPulseTimestamp(IJobUpdateDetails details) {
+    // Pulse timestamps are not durably stored by design. However, on system 
recovery,
+    // setting the timestamp of the last pulse to 0L (aka no pulse) is not 
correct.
+    // By inspecting the job update events we can infer a reasonable time 
stamp to initialize to.
+    // In this case, if the upgrade was not waiting for a pulse previously, we 
can reuse the
+    // timestamp of the last event. This does reset the counter for pulses, 
but reflects the
+    // most likely behaviour of a healthy system.
+
+    // This is safe because we always write at least one job update event on 
job update creation
+    IJobUpdateEvent mostRecent = CHRON_ORDERING.max(details.getUpdateEvents());
+
+    long ts = 0L;
+
+    if (!AWAITING_PULSE_STATES.contains(mostRecent.getStatus())) {
+      ts = mostRecent.getTimestampMs();
+    }
+
+    return ts;
+  }
+
   @Override
   public void systemResume() {
     storage.write((NoResult.Quiet) storeProvider -> {
@@ -284,7 +310,9 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
 
         if (isCoordinatedUpdate(instructions)) {
           LOG.info("Automatically restoring pulse state for " + key);
-          pulseHandler.initializePulseState(details.getUpdate(), status);
+
+          long pulseMs = inferLastPulseTimestamp(details);
+          pulseHandler.initializePulseState(details.getUpdate(), status, 
pulseMs);
         }
 
         if (AUTO_RESUME_STATES.contains(status)) {
@@ -769,11 +797,11 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
       this.clock = requireNonNull(clock);
     }
 
-    synchronized void initializePulseState(IJobUpdate update, JobUpdateStatus 
status) {
+    synchronized void initializePulseState(IJobUpdate update, JobUpdateStatus 
status, long ts) {
       pulseStates.put(update.getSummary().getKey(), new PulseState(
           status,
           update.getInstructions().getSettings().getBlockIfNoPulsesAfterMs(),
-          0L));
+          ts));
     }
 
     synchronized PulseState pulseAndGet(IJobUpdateKey key) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java 
b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index ea0b89a..30b44f8 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
 import com.google.common.eventbus.EventBus;
+import com.google.common.primitives.Ints;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -142,6 +143,8 @@ public class JobUpdaterIT extends EasyMockTest {
   private static final Amount<Long, Time> WATCH_TIMEOUT = Amount.of(2000L, 
Time.MILLISECONDS);
   private static final Amount<Long, Time> FLAPPING_THRESHOLD = Amount.of(1L, 
Time.MILLISECONDS);
   private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS);
+  private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
+  private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, 
Time.MINUTES);
   private static final ITaskConfig OLD_CONFIG =
       setExecutorData(TaskTestUtil.makeConfig(JOB), "olddata");
   private static final ITaskConfig NEW_CONFIG = setExecutorData(OLD_CONFIG, 
"newdata");
@@ -470,6 +473,8 @@ public class JobUpdaterIT extends EasyMockTest {
     storage.write(
         storeProvider -> saveJobUpdate(storeProvider.getJobUpdateStore(), 
update, ROLLING_FORWARD));
 
+    clock.advance(ONE_MINUTE);
+
     subscriber.startAsync().awaitRunning();
     ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = 
ImmutableMultimap.builder();
 
@@ -494,6 +499,53 @@ public class JobUpdaterIT extends EasyMockTest {
   }
 
   @Test
+  public void testRecoverLongPulseTimeoutCoordinatedUpdateFromStorage() throws 
Exception {
+    // A brief failover in the middle of a rolling forward update with a long 
pulse timeout should
+    // mean that after scheduler startup the update is not waiting for a pulse.
+    expectTaskKilled().times(1);
+
+    control.replay();
+
+    JobUpdate builder =
+        setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 0, OLD_CONFIG)), 
1).newBuilder();
+    builder.getInstructions().getSettings()
+        
.setBlockIfNoPulsesAfterMs(Ints.checkedCast(ONE_HOUR.as(Time.MILLISECONDS)));
+    IJobUpdate update = IJobUpdate.build(builder);
+    insertInitialTasks(update);
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    clock.advance(ONE_DAY);
+
+    storage.write(storeProvider ->
+        saveJobUpdate(storeProvider.getJobUpdateStore(), update, 
ROLL_FORWARD_AWAITING_PULSE));
+
+    // The first pulse comes after one minute
+    clock.advance(ONE_MINUTE);
+
+    storage.write(
+        (NoResult.Quiet) storeProvider ->
+            saveJobUpdateEvent(storeProvider.getJobUpdateStore(), update, 
ROLLING_FORWARD));
+
+    clock.advance(ONE_MINUTE);
+
+    subscriber.startAsync().awaitRunning();
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = 
ImmutableMultimap.builder();
+
+    actions.putAll(0, INSTANCE_UPDATING);
+    // Since the pulse interval is so large and the downtime was so short, the 
update does not need
+    // to wait for a pulse.
+    assertState(ROLLING_FORWARD, actions.build());
+
+    // Instance 0 is updated.
+    changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    actions.putAll(0, INSTANCE_UPDATED);
+
+    assertState(ROLLED_FORWARD, actions.build());
+    assertEquals(JobUpdatePulseStatus.FINISHED, updater.pulse(UPDATE_ID));
+  }
+
+  @Test
   public void testRecoverAwaitingPulseFromStorage() throws Exception {
     expectTaskKilled();
 
@@ -676,6 +728,8 @@ public class JobUpdaterIT extends EasyMockTest {
     storage.write(
         storeProvider -> saveJobUpdate(storeProvider.getJobUpdateStore(), 
update, ROLLING_FORWARD));
 
+    clock.advance(ONE_MINUTE);
+
     subscriber.startAsync().awaitRunning();
 
     ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = 
ImmutableMultimap.builder();
@@ -1142,13 +1196,21 @@ public class JobUpdaterIT extends EasyMockTest {
     }
 
     store.saveJobUpdate(update, Optional.of(lock.getToken()));
+    saveJobUpdateEvent(store, update, status);
+    return lock;
+  }
+
+  private void saveJobUpdateEvent(
+      JobUpdateStore.Mutable store,
+      IJobUpdate update,
+      JobUpdateStatus status) {
+
     store.saveJobUpdateEvent(
         update.getSummary().getKey(),
         IJobUpdateEvent.build(
             new JobUpdateEvent()
                 .setStatus(status)
                 .setTimestampMs(clock.nowMillis())));
-    return lock;
   }
 
   @Test

Reply via email to