Repository: aurora Updated Branches: refs/heads/master 9ea897978 -> 4ab4b2b2c
Add best effort pulse timestamp recovery. Currently the scheduler causes all coordinated ("pulsed") updates into ROLL_FORWARD_AWAITING_PULSE, or ROLL_BACK_AWAITING_PULSE on scheduler startup/recovery. This is because the last pulse timestamp is not durably stored and the timestamp of the last pulse is set to 0L (aka no pulse yet). In cases where the pulse timeout is larger and the failover is fast or frequent, this casues many updates to unnecessarily transition into a pulse related state until the next pulse. It is posible to avoid these uncessary transitons by traversing the job update events and initializing the last pulse timestamp to the last event if the last event was not a pulse event. Bugs closed: AURORA-1890 Reviewed at https://reviews.apache.org/r/56723/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4ab4b2b2 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4ab4b2b2 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4ab4b2b2 Branch: refs/heads/master Commit: 4ab4b2b2c4ce7c6cc4e79aacde0cb54275f82b67 Parents: 9ea8979 Author: Zameer Manji <zma...@apache.org> Authored: Thu Feb 16 12:08:34 2017 -0800 Committer: Zameer Manji <zma...@apache.org> Committed: Thu Feb 16 12:08:34 2017 -0800 ---------------------------------------------------------------------- .../thrift/org/apache/aurora/gen/api.thrift | 3 + .../org/apache/aurora/scheduler/base/Jobs.java | 10 +++ .../updater/JobUpdateControllerImpl.java | 36 +++++++++-- .../aurora/scheduler/updater/JobUpdaterIT.java | 64 +++++++++++++++++++- 4 files changed, 108 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index efd4e53..3749531 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -619,6 +619,9 @@ const set<JobUpdateStatus> ACTIVE_JOB_UPDATE_STATES = [JobUpdateStatus.ROLLING_F JobUpdateStatus.ROLL_BACK_PAUSED, JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE, JobUpdateStatus.ROLL_BACK_AWAITING_PULSE] +/** States the job update can be in while waiting for a pulse. */ +const set<JobUpdateStatus> AWAITNG_PULSE_JOB_UPDATE_STATES = [JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE, + JobUpdateStatus.ROLL_BACK_AWAITING_PULSE] /** Job update actions that can be applied to job instances. */ enum JobUpdateAction { http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/src/main/java/org/apache/aurora/scheduler/base/Jobs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java index 49e5b2c..8d5f4e5 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java +++ b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java @@ -13,8 +13,12 @@ */ package org.apache.aurora.scheduler.base; +import java.util.EnumSet; + import org.apache.aurora.gen.JobStats; +import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.apiConstants; import org.apache.aurora.scheduler.storage.entities.IJobStats; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -28,6 +32,12 @@ public final class Jobs { } /** + * States of updates that are blocked on pulses. + */ + public static final EnumSet<JobUpdateStatus> AWAITING_PULSE_STATES = + EnumSet.copyOf(apiConstants.AWAITNG_PULSE_JOB_UPDATE_STATES); + + /** * For a given collection of tasks compute statistics based on the state of the task. * * @param tasks a collection of tasks for which statistics are sought http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index 729c123..e141124 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.updater; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +29,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.aurora.common.application.Lifecycle; @@ -84,6 +86,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK; import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD; import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE; import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError; +import static org.apache.aurora.scheduler.base.Jobs.AWAITING_PULSE_STATES; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES; @@ -202,7 +205,7 @@ class JobUpdateControllerImpl implements JobUpdateController { JobUpdateStatus status = ROLLING_FORWARD; if (isCoordinatedUpdate(instructions)) { status = ROLL_FORWARD_AWAITING_PULSE; - pulseHandler.initializePulseState(update, status); + pulseHandler.initializePulseState(update, status, 0L); } recordAndChangeJobUpdateStatus( @@ -271,6 +274,29 @@ class JobUpdateControllerImpl implements JobUpdateController { return status -> addAuditData(newEvent(status), auditData); } + private static final Ordering<IJobUpdateEvent> CHRON_ORDERING = + Ordering.from(Comparator.comparingLong(IJobUpdateEvent::getTimestampMs)); + + private long inferLastPulseTimestamp(IJobUpdateDetails details) { + // Pulse timestamps are not durably stored by design. However, on system recovery, + // setting the timestamp of the last pulse to 0L (aka no pulse) is not correct. + // By inspecting the job update events we can infer a reasonable time stamp to initialize to. + // In this case, if the upgrade was not waiting for a pulse previously, we can reuse the + // timestamp of the last event. This does reset the counter for pulses, but reflects the + // most likely behaviour of a healthy system. + + // This is safe because we always write at least one job update event on job update creation + IJobUpdateEvent mostRecent = CHRON_ORDERING.max(details.getUpdateEvents()); + + long ts = 0L; + + if (!AWAITING_PULSE_STATES.contains(mostRecent.getStatus())) { + ts = mostRecent.getTimestampMs(); + } + + return ts; + } + @Override public void systemResume() { storage.write((NoResult.Quiet) storeProvider -> { @@ -284,7 +310,9 @@ class JobUpdateControllerImpl implements JobUpdateController { if (isCoordinatedUpdate(instructions)) { LOG.info("Automatically restoring pulse state for " + key); - pulseHandler.initializePulseState(details.getUpdate(), status); + + long pulseMs = inferLastPulseTimestamp(details); + pulseHandler.initializePulseState(details.getUpdate(), status, pulseMs); } if (AUTO_RESUME_STATES.contains(status)) { @@ -769,11 +797,11 @@ class JobUpdateControllerImpl implements JobUpdateController { this.clock = requireNonNull(clock); } - synchronized void initializePulseState(IJobUpdate update, JobUpdateStatus status) { + synchronized void initializePulseState(IJobUpdate update, JobUpdateStatus status, long ts) { pulseStates.put(update.getSummary().getKey(), new PulseState( status, update.getInstructions().getSettings().getBlockIfNoPulsesAfterMs(), - 0L)); + ts)); } synchronized PulseState pulseAndGet(IJobUpdateKey key) { http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index ea0b89a..30b44f8 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -30,6 +30,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.collect.Ordering; import com.google.common.eventbus.EventBus; +import com.google.common.primitives.Ints; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -142,6 +143,8 @@ public class JobUpdaterIT extends EasyMockTest { private static final Amount<Long, Time> WATCH_TIMEOUT = Amount.of(2000L, Time.MILLISECONDS); private static final Amount<Long, Time> FLAPPING_THRESHOLD = Amount.of(1L, Time.MILLISECONDS); private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS); + private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS); + private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES); private static final ITaskConfig OLD_CONFIG = setExecutorData(TaskTestUtil.makeConfig(JOB), "olddata"); private static final ITaskConfig NEW_CONFIG = setExecutorData(OLD_CONFIG, "newdata"); @@ -470,6 +473,8 @@ public class JobUpdaterIT extends EasyMockTest { storage.write( storeProvider -> saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLLING_FORWARD)); + clock.advance(ONE_MINUTE); + subscriber.startAsync().awaitRunning(); ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); @@ -494,6 +499,53 @@ public class JobUpdaterIT extends EasyMockTest { } @Test + public void testRecoverLongPulseTimeoutCoordinatedUpdateFromStorage() throws Exception { + // A brief failover in the middle of a rolling forward update with a long pulse timeout should + // mean that after scheduler startup the update is not waiting for a pulse. + expectTaskKilled().times(1); + + control.replay(); + + JobUpdate builder = + setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 0, OLD_CONFIG)), 1).newBuilder(); + builder.getInstructions().getSettings() + .setBlockIfNoPulsesAfterMs(Ints.checkedCast(ONE_HOUR.as(Time.MILLISECONDS))); + IJobUpdate update = IJobUpdate.build(builder); + insertInitialTasks(update); + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + clock.advance(ONE_DAY); + + storage.write(storeProvider -> + saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLL_FORWARD_AWAITING_PULSE)); + + // The first pulse comes after one minute + clock.advance(ONE_MINUTE); + + storage.write( + (NoResult.Quiet) storeProvider -> + saveJobUpdateEvent(storeProvider.getJobUpdateStore(), update, ROLLING_FORWARD)); + + clock.advance(ONE_MINUTE); + + subscriber.startAsync().awaitRunning(); + ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); + + actions.putAll(0, INSTANCE_UPDATING); + // Since the pulse interval is so large and the downtime was so short, the update does not need + // to wait for a pulse. + assertState(ROLLING_FORWARD, actions.build()); + + // Instance 0 is updated. + changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + + actions.putAll(0, INSTANCE_UPDATED); + + assertState(ROLLED_FORWARD, actions.build()); + assertEquals(JobUpdatePulseStatus.FINISHED, updater.pulse(UPDATE_ID)); + } + + @Test public void testRecoverAwaitingPulseFromStorage() throws Exception { expectTaskKilled(); @@ -676,6 +728,8 @@ public class JobUpdaterIT extends EasyMockTest { storage.write( storeProvider -> saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLLING_FORWARD)); + clock.advance(ONE_MINUTE); + subscriber.startAsync().awaitRunning(); ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder(); @@ -1142,13 +1196,21 @@ public class JobUpdaterIT extends EasyMockTest { } store.saveJobUpdate(update, Optional.of(lock.getToken())); + saveJobUpdateEvent(store, update, status); + return lock; + } + + private void saveJobUpdateEvent( + JobUpdateStore.Mutable store, + IJobUpdate update, + JobUpdateStatus status) { + store.saveJobUpdateEvent( update.getSummary().getKey(), IJobUpdateEvent.build( new JobUpdateEvent() .setStatus(status) .setTimestampMs(clock.nowMillis()))); - return lock; } @Test