Author: mreutegg
Date: Thu Nov 14 14:25:30 2013
New Revision: 1541915

URL: http://svn.apache.org/r1541915
Log:
OAK-1170: Inconsistent reads with concurrent benchmark tests
- make external changes only visible in background read, based on _lastRev 
available on root document
- remove MongoNodeStore.publishRevision()
- adjust some tests because the implementation doesn't try to make external 
changes visible anymore on a conflict
- enable test added for this issue

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
 Thu Nov 14 14:25:30 2013
@@ -67,12 +67,14 @@ class Collision {
      * marked if it is not yet committed, otherwise our revision is marked.
      * 
      * @param store the document store.
+     * @return the revision that was marked. Either our or their.
      * @throws MicroKernelException if the mark operation fails.
      */
-    void mark(DocumentStore store) throws MicroKernelException {
+    @Nonnull
+    Revision mark(DocumentStore store) throws MicroKernelException {
         // first try to mark their revision
         if (markCommitRoot(document, theirRev, store)) {
-            return;
+            return theirRev;
         }
         // their commit wins, we have to mark ourRev
         NodeDocument newDoc = Collection.NODES.newDocument(store);
@@ -83,6 +85,7 @@ class Collision {
                     + "with collision marker. Our revision: " + ourRev
                     + ", document:\n" + newDoc.format());
         }
+        return ourRev;
     }
 
     /**

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
 Thu Nov 14 14:25:30 2013
@@ -21,7 +21,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
@@ -50,6 +49,7 @@ public class Commit {
     private final Revision revision;
     private HashMap<String, UpdateOp> operations = new HashMap<String, 
UpdateOp>();
     private JsopWriter diff = new JsopStream();
+    private List<Revision> collisions = new ArrayList<Revision>();
 
     /**
      * List of all node paths which have been modified in this commit. In 
addition to the nodes
@@ -340,19 +340,16 @@ public class Commit {
      * @param op the operation
      */
     public void createOrUpdateNode(DocumentStore store, UpdateOp op) {
+        collisions.clear();
         NodeDocument doc = store.createOrUpdate(Collection.NODES, op);
         if (baseRevision != null) {
-            final AtomicReference<List<Revision>> collisions = new 
AtomicReference<List<Revision>>();
             Revision newestRev = null;
             if (doc != null) {
                 newestRev = doc.getNewestRevision(nodeStore, revision,
                         new CollisionHandler() {
                             @Override
                             void concurrentModification(Revision other) {
-                                if (collisions.get() == null) {
-                                    collisions.set(new ArrayList<Revision>());
-                                }
-                                collisions.get().add(other);
+                                collisions.add(other);
                             }
                         });
             }
@@ -375,21 +372,37 @@ public class Commit {
                             baseRevision;
                 }
             }
+            if (conflictMessage == null) {
+                // the modification was successful
+                // -> check for collisions and conflict (concurrent updates
+                // on a node are possible if property updates do not overlap)
+                // TODO: unify above conflict detection and isConflicting()
+                if (!collisions.isEmpty() && isConflicting(doc, op)) {
+                    for (Revision r : collisions) {
+                        // mark collisions on commit root
+                        Collision c = new Collision(doc, r, op, revision, 
nodeStore);
+                        if (c.mark(store).equals(revision)) {
+                            // our revision was marked
+                            if (baseRevision.isBranch()) {
+                                // this is a branch commit. do not fail 
immediately
+                                // merging this branch will fail later.
+                            } else {
+                                // fail immediately
+                                conflictMessage = "The node " +
+                                        op.getId() + " was changed in 
revision\n" + r +
+                                        ", which was applied after the base 
revision\n" +
+                                        baseRevision;
+                            }
+                        }
+                    }
+                }
+            }
             if (conflictMessage != null) {
                 conflictMessage += ", before\n" + revision + 
                         "; document:\n" + (doc == null ? "" : doc.format()) +
                         ",\nrevision order:\n" + 
nodeStore.getRevisionComparator();
                 throw new MicroKernelException(conflictMessage);
             }
-            // if we get here the modification was successful
-            // -> check for collisions and conflict (concurrent updates
-            // on a node are possible if property updates do not overlap)
-            if (collisions.get() != null && isConflicting(doc, op)) {
-                for (Revision r : collisions.get()) {
-                    // mark collisions on commit root
-                    new Collision(doc, r, op, revision, nodeStore).mark(store);
-                }
-            }
         }
 
         if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) {

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
 Thu Nov 14 14:25:30 2013
@@ -263,12 +263,11 @@ public final class MongoNodeStore
                 builder.getWeigher(), builder.getDocChildrenCacheSize());
 
         // check if root node exists
-        Revision head = newRevision();
-        Node n = readNode("/", head);
-        if (n == null) {
+        if (store.find(Collection.NODES, Utils.getIdFromPath("/")) == null) {
             // root node is missing: repository is not initialized
+            Revision head = newRevision();
             Commit commit = new Commit(this, null, head);
-            n = new Node("/", head);
+            Node n = new Node("/", head);
             commit.addNode(n);
             commit.applyToDocumentStore();
             commit.applyToCache(false);
@@ -284,8 +283,8 @@ public final class MongoNodeStore
                 // no revision read from other cluster nodes
                 setHeadRevision(newRevision());
             }
-            getRevisionComparator().add(headRevision, Revision.newRevision(0));
         }
+        getRevisionComparator().add(headRevision, Revision.newRevision(0));
         dispatcher = new ChangeDispatcher(getRoot());
         commitQueue = new CommitQueue(this, dispatcher);
         backgroundThread = new Thread(
@@ -934,37 +933,6 @@ public final class MongoNodeStore
     }
 
     @Override
-    public void publishRevision(Revision foreignRevision, Revision 
changeRevision) {
-        Revision.RevisionComparator revisionComparator = 
getRevisionComparator();
-        if (revisionComparator.compare(headRevision, foreignRevision) >= 0) {
-            // already visible
-            return;
-        }
-        int clusterNodeId = foreignRevision.getClusterId();
-        if (clusterNodeId == this.clusterId) {
-            return;
-        }
-        // the (old) head occurred first
-        Revision headSeen = Revision.newRevision(0);
-        // then we saw this new revision (from another cluster node)
-        Revision otherSeen = Revision.newRevision(0);
-        // and after that, the current change
-        Revision changeSeen = Revision.newRevision(0);
-        revisionComparator.add(foreignRevision, otherSeen);
-        // TODO invalidating the whole cache is not really needed,
-        // but how to ensure we invalidate the right part of the cache?
-        // possibly simply wait for the background thread to pick
-        // up the changes, but this depends on how often this method is called
-        store.invalidateCache();
-        // the latest revisions of the current cluster node
-        // happened before the latest revisions of other cluster nodes
-        revisionComparator.add(headRevision, headSeen);
-        revisionComparator.add(changeRevision, changeSeen);
-        // the head revision is after other revisions
-        setHeadRevision(Revision.newRevision(clusterId));
-    }
-
-    @Override
     public int getClusterId() {
         return clusterId;
     }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
 Thu Nov 14 14:25:30 2013
@@ -324,21 +324,14 @@ public class NodeDocument extends Docume
         for (Revision r : Iterables.mergeSorted(
                 Arrays.asList(revisions.keySet(), commitRoots.keySet()),
                 revisions.comparator())) {
-            if (isRevisionNewer(context, r, changeRev)) {
-                // we have seen a previous change from another cluster node
-                // (which might be conflicting or not) - we need to make
-                // sure this change is visible from now on
-                // TODO verify this is really needed
-                context.publishRevision(r, changeRev);
-            }
             if (!r.equals(changeRev)) {
-                if (!isValidRevision(context, r, changeRev, new 
HashSet<Revision>())) {
-                    handler.concurrentModification(r);
-                } else {
+                if (isValidRevision(context, r, changeRev, new 
HashSet<Revision>())) {
                     newestRev = r;
                     // found newest revision, no need to check more revisions
                     // revisions are sorted newest first
                     break;
+                } else {
+                    handler.concurrentModification(r);
                 }
             }
         }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
 Thu Nov 14 14:25:30 2013
@@ -380,13 +380,9 @@ public class Revision {
                         return;
                     }
                     if (last.revision.compareRevisionTime(r) > 0) {
-                        /*
                         throw new IllegalArgumentException(
                                 "Can not add an earlier revision: " + 
last.revision + " > " + r + 
                                 "; current cluster node is " + 
currentClusterNodeId);
-                        */
-                        // quick fix for OAK-1167
-                        return;
                     }
                     newList = new ArrayList<RevisionRange>(list);
                 }
@@ -413,6 +409,9 @@ public class Revision {
             }
             Revision range1 = getRevisionSeen(o1);
             Revision range2 = getRevisionSeen(o2);
+            if (range1 == FUTURE && range2 == FUTURE) {
+                return o1.compareRevisionTime(o2);
+            }
             if (range1 == null || range2 == null) {
                 return o1.compareRevisionTime(o2);
             }
@@ -438,6 +437,11 @@ public class Revision {
         private Revision getRevisionSeen(Revision r) {
             List<RevisionRange> list = map.get(r.getClusterId());
             if (list == null) {
+                if (r.getClusterId() != currentClusterNodeId) {
+                    // this is from a cluster node we did not see yet
+                    // see also OAK-1170
+                    return FUTURE;
+                }
                 return null;
             }
             // search from latest backward

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
 Thu Nov 14 14:25:30 2013
@@ -40,15 +40,6 @@ public interface RevisionContext {
     Comparator<Revision> getRevisionComparator();
 
     /**
-     * Ensure the revision visible from now on, possibly by updating the head
-     * revision, so that the changes that occurred are visible.
-     *
-     * @param foreignRevision the revision from another cluster node
-     * @param changeRevision the local revision that is sorted after the 
foreign revision
-     */
-    void publishRevision(Revision foreignRevision, Revision changeRevision);
-
-    /**
      * @return the cluster id of the local MongoMK instance.
      */
     int getClusterId();

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java
 Thu Nov 14 14:25:30 2013
@@ -28,7 +28,6 @@ import static org.junit.Assert.assertEqu
  */
 public class ClusterJoinTest extends AbstractMongoConnectionTest {
 
-    @Ignore
     @Test
     public void nodeJoins() throws Exception {
         String rev1, rev2, rev3;
@@ -77,7 +76,7 @@ public class ClusterJoinTest extends Abs
                 }
                 assertNodesExist(rev3, "/" + name);
             }
-            // must only see all nodes @rev3
+            // must now see all nodes @rev3
             assertEquals(5L, obj.get(":childNodeCount"));
         } finally {
             mk2.dispose();

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java
 Thu Nov 14 14:25:30 2013
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.fail;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.mk.blobs.BlobStore;
 import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
 import org.junit.After;
 import org.junit.Before;
@@ -119,34 +120,27 @@ public class ClusterTest {
     public void openCloseOpen() {
         MemoryDocumentStore ds = new MemoryDocumentStore();
         MemoryBlobStore bs = new MemoryBlobStore();
-        MongoMK.Builder builder;
-        
-        builder = new MongoMK.Builder();
-        builder.setDocumentStore(ds).setBlobStore(bs);
-        MongoMK mk1 = builder.setClusterId(1).open();
+
+        MongoMK mk1 = createMK(1, 0, ds, bs);
         mk1.commit("/", "+\"a\": {}", null, null);
         mk1.commit("/", "-\"a\"", null, null);
+        mk1.runBackgroundOperations();
         
-        builder = new MongoMK.Builder();
-        builder.setDocumentStore(ds).setBlobStore(bs);
-        MongoMK mk2 = builder.setClusterId(2).open();
+        MongoMK mk2 = createMK(2, 0, ds, bs);
         mk2.commit("/", "+\"a\": {}", null, null);
         mk2.commit("/", "-\"a\"", null, null);
+        mk2.runBackgroundOperations();
 
-        builder = new MongoMK.Builder();
-        builder.setDocumentStore(ds).setBlobStore(bs);
-        MongoMK mk3 = builder.setClusterId(3).open();
+        MongoMK mk3 = createMK(3, 0, ds, bs);
         mk3.commit("/", "+\"a\": {}", null, null);
         mk3.commit("/", "-\"a\"", null, null);
+        mk3.runBackgroundOperations();
 
-        builder = new MongoMK.Builder();
-        builder.setDocumentStore(ds).setBlobStore(bs);
-        MongoMK mk4 = builder.setClusterId(4).open();
+        MongoMK mk4 = createMK(4, 0, ds, bs);
         mk4.commit("/", "+\"a\": {}", null, null);
+        mk4.runBackgroundOperations();
 
-        builder = new MongoMK.Builder();
-        builder.setDocumentStore(ds).setBlobStore(bs);
-        MongoMK mk5 = builder.setClusterId(5).open();
+        MongoMK mk5 = createMK(5, 0, ds, bs);
         mk5.commit("/", "-\"a\"", null, null);
         mk5.commit("/", "+\"a\": {}", null, null);
 
@@ -227,8 +221,8 @@ public class ClusterTest {
     
     @Test
     public void conflict() {
-        MongoMK mk1 = createMK(1);
-        MongoMK mk2 = createMK(2);
+        MongoMK mk1 = createMK(1, 0);
+        MongoMK mk2 = createMK(2, 0);
         
         String m1r0 = mk1.getHeadRevision();
         String m2r0 = mk2.getHeadRevision();
@@ -240,8 +234,10 @@ public class ClusterTest {
         } catch (MicroKernelException e) {
             // expected
         }
-        // now, after the conflict, both cluster nodes see the node
-        // (before the conflict, this isn't necessarily the case for mk2)
+        mk1.runBackgroundOperations();
+        mk2.runBackgroundOperations();
+
+        // node becomes visible after running background operations
         String n1 = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 10, null);
         String n2 = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 10, null);
         assertEquals(n1, n2);
@@ -319,10 +315,14 @@ public class ClusterTest {
     }
 
     private MongoMK createMK(int clusterId) {
-        MongoMK.Builder builder = new MongoMK.Builder();
+        return createMK(clusterId, 10);
+    }
+
+    private MongoMK createMK(int clusterId, int asyncDelay) {
         if (MONGO_DB) {
             DB db = MongoUtils.getConnection().getDB();
-            builder.setMongoDB(db);
+            return new MongoMK.Builder().setMongoDB(db)
+                    .setClusterId(clusterId).setAsyncDelay(asyncDelay).open();
         } else {
             if (ds == null) {
                 ds = new MemoryDocumentStore();
@@ -330,10 +330,14 @@ public class ClusterTest {
             if (bs == null) {
                 bs = new MemoryBlobStore();
             }
-            builder.setDocumentStore(ds).setBlobStore(bs);
+            return createMK(clusterId, asyncDelay, ds, bs);
         }
-        builder.setAsyncDelay(10);
-        return builder.setClusterId(clusterId).open();
+    }
+
+    private MongoMK createMK(int clusterId, int asyncDelay,
+                           DocumentStore ds, BlobStore bs) {
+        return new MongoMK.Builder().setDocumentStore(ds).setBlobStore(bs)
+                .setClusterId(clusterId).setAsyncDelay(asyncDelay).open();
     }
 
 }


Reply via email to