Author: mreutegg Date: Thu May 22 18:44:23 2014 New Revision: 1596936 URL: http://svn.apache.org/r1596936 Log: OAK-1854: Duplicate revisions
Modified: jackrabbit/oak/branches/1.0/ (props changed) jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java Propchange: jackrabbit/oak/branches/1.0/ ------------------------------------------------------------------------------ Merged /jackrabbit/oak/trunk:r1593245,1593250 Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java?rev=1596936&r1=1596935&r2=1596936&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java Thu May 22 18:44:23 2014 @@ -157,6 +157,12 @@ public class Revision { long timestamp = getCurrentTimestamp(); int c; synchronized (Revision.class) { + // need to check again, because threads + // could arrive inside the synchronized block + // out of order + if (timestamp < lastRevisionTimestamp) { + timestamp = lastRevisionTimestamp; + } if (timestamp == lastRevisionTimestamp) { c = ++lastRevisionCount; } else { Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java?rev=1596936&r1=1596935&r2=1596936&view=diff ============================================================================== --- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java (original) +++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java Thu May 22 18:44:23 2014 @@ -16,12 +16,26 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.jackrabbit.oak.plugins.document.Revision.RevisionComparator; import org.junit.Test; @@ -283,4 +297,109 @@ public class RevisionTest { assertEquals(new Revision(0x30, 0, 0), comp.getRevisionSeen(r21)); } + @Test + public void uniqueRevision2() throws Exception { + List<Thread> threads = new ArrayList<Thread>(); + final AtomicBoolean stop = new AtomicBoolean(); + final Set<Revision> set = Collections + .synchronizedSet(new HashSet<Revision>()); + final Revision[] duplicate = new Revision[1]; + for (int i = 0; i < 20; i++) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + Revision[] last = new Revision[1024]; + while (!stop.get()) { + for (Revision r : last) { + set.remove(r); + } + for (int i = 0; i < last.length; i++) { + last[i] = Revision.newRevision(1); + } + for (Revision r : last) { + if (!set.add(r)) { + duplicate[0] = r; + } + } + } + } + }); + thread.start(); + threads.add(thread); + } + Thread.sleep(200); + stop.set(true); + for (Thread t : threads) { + t.join(); + } + assertNull("Duplicate revision", duplicate[0]); + } + + @Test + public void uniqueRevision() throws Exception { + //Revision.setClock(new Clock.Virtual()); + final BlockingQueue<Revision> revisionQueue = Queues.newLinkedBlockingQueue(); + int noOfThreads = 60; + final int noOfLoops = 1000; + List<Thread> workers = new ArrayList<Thread>(); + final AtomicBoolean stop = new AtomicBoolean(); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch stopLatch = new CountDownLatch(noOfThreads); + for (int i = 0; i < noOfThreads; i++) { + workers.add(new Thread(new Runnable() { + @Override + public void run() { + Uninterruptibles.awaitUninterruptibly(startLatch); + for (int j = 0; j < noOfLoops && !stop.get(); j++) { + revisionQueue.add(Revision.newRevision(1)); + } + stopLatch.countDown(); + } + })); + } + + final List<Revision> duplicates = Lists.newArrayList(); + final Set<Revision> seenRevs = Sets.newHashSet(); + workers.add(new Thread(new Runnable() { + @Override + public void run() { + startLatch.countDown(); + + while (!stop.get()) { + List<Revision> revs = Lists.newArrayList(); + Queues.drainUninterruptibly(revisionQueue, revs, 5, 100, TimeUnit.MILLISECONDS); + record(revs); + } + + List<Revision> revs = Lists.newArrayList(); + revisionQueue.drainTo(revs); + record(revs); + } + + private void record(List<Revision> revs) { + for (Revision rev : revs) { + if (!seenRevs.add(rev)) { + duplicates.add(rev); + } + } + + if (!duplicates.isEmpty()) { + stop.set(true); + } + } + })); + + for (Thread t : workers) { + t.start(); + } + + stopLatch.await(); + stop.set(true); + + for (Thread t : workers) { + t.join(); + } + assertTrue(String.format("Duplicate rev seen %s %n Seen %s", duplicates, seenRevs), duplicates.isEmpty()); + } + }