Author: adulceanu Date: Tue Jan 30 13:27:58 2018 New Revision: 1822640 URL: http://svn.apache.org/viewvc?rev=1822640&view=rev Log: OAK-7162 - Race condition on revisions head between compaction and scheduler could result in skipped commit Moved head state update in a loop Added upper bound, backoff and logging
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java?rev=1822640&r1=1822639&r2=1822640&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java Tue Jan 30 13:27:58 2018 @@ -19,20 +19,25 @@ package org.apache.jackrabbit.oak.segmen import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static java.lang.Thread.currentThread; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.jackrabbit.oak.api.Type.LONG; import java.io.Closeable; +import java.text.MessageFormat; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingWindowReservoir; import javax.annotation.Nonnull; - import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.segment.Revisions; @@ -51,9 +56,6 @@ import org.apache.jackrabbit.oak.stats.S import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.SlidingWindowReservoir; - public class LockBasedScheduler implements Scheduler { public static class LockBasedSchedulerBuilder { @@ -123,6 +125,12 @@ public class LockBasedScheduler implemen .parseDouble(System.getProperty("oak.scheduler.fetch.commitDelayQuantile", "0.5")); /** + * Maximum number of milliseconds to wait before re-attempting to update the current + * head state after a successful commit, provided a concurrent head state update happens. + */ + private static final long MAXIMUM_BACKOFF = MILLISECONDS.convert(10, SECONDS); + + /** * Sets the number of seconds to wait for the attempt to grab the lock to * create a checkpoint */ @@ -149,6 +157,7 @@ public class LockBasedScheduler implemen private final Histogram commitTimeHistogram = new Histogram(new SlidingWindowReservoir(1000)); + private final Random random = new Random(); public LockBasedScheduler(LockBasedSchedulerBuilder builder) { if (COMMIT_FAIR_LOCK) { @@ -244,19 +253,39 @@ public class LockBasedScheduler implemen } } - private NodeState execute(Commit commit) throws CommitFailedException { + private NodeState execute(Commit commit) throws CommitFailedException, InterruptedException { // only do the merge if there are some changes to commit if (commit.hasChanges()) { - refreshHead(true); - SegmentNodeState before = head.get(); - SegmentNodeState after = commit.apply(before); - if (revisions.setHead(before.getRecordId(), after.getRecordId())) { - head.set(after); - contentChanged(after.getChildNode(ROOT), commit.info()); + long start = System.nanoTime(); + + int count = 0; + for (long backoff = 1; backoff < MAXIMUM_BACKOFF; backoff *= 2) { refreshHead(true); + SegmentNodeState before = head.get(); + SegmentNodeState after = commit.apply(before); + + if (revisions.setHead(before.getRecordId(), after.getRecordId())) { + head.set(after); + contentChanged(after.getChildNode(ROOT), commit.info()); + refreshHead(true); + + return head.get().getChildNode(ROOT); + } + + count++; + int randNs = random.nextInt(1_000_000); + log.info("Scheduler detected concurrent commits. Retrying after {} ms and {} ns", backoff, randNs); + Thread.sleep(backoff, randNs); } + + long finish = System.nanoTime(); + + String message = MessageFormat.format( + "The commit could not be executed after {} attempts. Total wait time: {} ms", + count, NANOSECONDS.toMillis(finish - start)); + throw new CommitFailedException("Segment", 3, message); } - + return head.get().getChildNode(ROOT); }