Author: mreutegg
Date: Thu Nov 14 14:25:30 2013
New Revision: 1541915
URL: http://svn.apache.org/r1541915
Log:
OAK-1170: Inconsistent reads with concurrent benchmark tests
- make external changes only visible in background read, based on _lastRev
available on root document
- remove MongoNodeStore.publishRevision()
- adjust some tests because the implementation doesn't try to make external
changes visible anymore on a conflict
- enable test added for this issue
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Collision.java
Thu Nov 14 14:25:30 2013
@@ -67,12 +67,14 @@ class Collision {
* marked if it is not yet committed, otherwise our revision is marked.
*
* @param store the document store.
+ * @return the revision that was marked. Either our or their.
* @throws MicroKernelException if the mark operation fails.
*/
- void mark(DocumentStore store) throws MicroKernelException {
+ @Nonnull
+ Revision mark(DocumentStore store) throws MicroKernelException {
// first try to mark their revision
if (markCommitRoot(document, theirRev, store)) {
- return;
+ return theirRev;
}
// their commit wins, we have to mark ourRev
NodeDocument newDoc = Collection.NODES.newDocument(store);
@@ -83,6 +85,7 @@ class Collision {
+ "with collision marker. Our revision: " + ourRev
+ ", document:\n" + newDoc.format());
}
+ return ourRev;
}
/**
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
Thu Nov 14 14:25:30 2013
@@ -21,7 +21,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
@@ -50,6 +49,7 @@ public class Commit {
private final Revision revision;
private HashMap<String, UpdateOp> operations = new HashMap<String,
UpdateOp>();
private JsopWriter diff = new JsopStream();
+ private List<Revision> collisions = new ArrayList<Revision>();
/**
* List of all node paths which have been modified in this commit. In
addition to the nodes
@@ -340,19 +340,16 @@ public class Commit {
* @param op the operation
*/
public void createOrUpdateNode(DocumentStore store, UpdateOp op) {
+ collisions.clear();
NodeDocument doc = store.createOrUpdate(Collection.NODES, op);
if (baseRevision != null) {
- final AtomicReference<List<Revision>> collisions = new
AtomicReference<List<Revision>>();
Revision newestRev = null;
if (doc != null) {
newestRev = doc.getNewestRevision(nodeStore, revision,
new CollisionHandler() {
@Override
void concurrentModification(Revision other) {
- if (collisions.get() == null) {
- collisions.set(new ArrayList<Revision>());
- }
- collisions.get().add(other);
+ collisions.add(other);
}
});
}
@@ -375,21 +372,37 @@ public class Commit {
baseRevision;
}
}
+ if (conflictMessage == null) {
+ // the modification was successful
+ // -> check for collisions and conflict (concurrent updates
+ // on a node are possible if property updates do not overlap)
+ // TODO: unify above conflict detection and isConflicting()
+ if (!collisions.isEmpty() && isConflicting(doc, op)) {
+ for (Revision r : collisions) {
+ // mark collisions on commit root
+ Collision c = new Collision(doc, r, op, revision,
nodeStore);
+ if (c.mark(store).equals(revision)) {
+ // our revision was marked
+ if (baseRevision.isBranch()) {
+ // this is a branch commit. do not fail
immediately
+ // merging this branch will fail later.
+ } else {
+ // fail immediately
+ conflictMessage = "The node " +
+ op.getId() + " was changed in
revision\n" + r +
+ ", which was applied after the base
revision\n" +
+ baseRevision;
+ }
+ }
+ }
+ }
+ }
if (conflictMessage != null) {
conflictMessage += ", before\n" + revision +
"; document:\n" + (doc == null ? "" : doc.format()) +
",\nrevision order:\n" +
nodeStore.getRevisionComparator();
throw new MicroKernelException(conflictMessage);
}
- // if we get here the modification was successful
- // -> check for collisions and conflict (concurrent updates
- // on a node are possible if property updates do not overlap)
- if (collisions.get() != null && isConflicting(doc, op)) {
- for (Revision r : collisions.get()) {
- // mark collisions on commit root
- new Collision(doc, r, op, revision, nodeStore).mark(store);
- }
- }
}
if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
Thu Nov 14 14:25:30 2013
@@ -263,12 +263,11 @@ public final class MongoNodeStore
builder.getWeigher(), builder.getDocChildrenCacheSize());
// check if root node exists
- Revision head = newRevision();
- Node n = readNode("/", head);
- if (n == null) {
+ if (store.find(Collection.NODES, Utils.getIdFromPath("/")) == null) {
// root node is missing: repository is not initialized
+ Revision head = newRevision();
Commit commit = new Commit(this, null, head);
- n = new Node("/", head);
+ Node n = new Node("/", head);
commit.addNode(n);
commit.applyToDocumentStore();
commit.applyToCache(false);
@@ -284,8 +283,8 @@ public final class MongoNodeStore
// no revision read from other cluster nodes
setHeadRevision(newRevision());
}
- getRevisionComparator().add(headRevision, Revision.newRevision(0));
}
+ getRevisionComparator().add(headRevision, Revision.newRevision(0));
dispatcher = new ChangeDispatcher(getRoot());
commitQueue = new CommitQueue(this, dispatcher);
backgroundThread = new Thread(
@@ -934,37 +933,6 @@ public final class MongoNodeStore
}
@Override
- public void publishRevision(Revision foreignRevision, Revision
changeRevision) {
- Revision.RevisionComparator revisionComparator =
getRevisionComparator();
- if (revisionComparator.compare(headRevision, foreignRevision) >= 0) {
- // already visible
- return;
- }
- int clusterNodeId = foreignRevision.getClusterId();
- if (clusterNodeId == this.clusterId) {
- return;
- }
- // the (old) head occurred first
- Revision headSeen = Revision.newRevision(0);
- // then we saw this new revision (from another cluster node)
- Revision otherSeen = Revision.newRevision(0);
- // and after that, the current change
- Revision changeSeen = Revision.newRevision(0);
- revisionComparator.add(foreignRevision, otherSeen);
- // TODO invalidating the whole cache is not really needed,
- // but how to ensure we invalidate the right part of the cache?
- // possibly simply wait for the background thread to pick
- // up the changes, but this depends on how often this method is called
- store.invalidateCache();
- // the latest revisions of the current cluster node
- // happened before the latest revisions of other cluster nodes
- revisionComparator.add(headRevision, headSeen);
- revisionComparator.add(changeRevision, changeSeen);
- // the head revision is after other revisions
- setHeadRevision(Revision.newRevision(clusterId));
- }
-
- @Override
public int getClusterId() {
return clusterId;
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/NodeDocument.java
Thu Nov 14 14:25:30 2013
@@ -324,21 +324,14 @@ public class NodeDocument extends Docume
for (Revision r : Iterables.mergeSorted(
Arrays.asList(revisions.keySet(), commitRoots.keySet()),
revisions.comparator())) {
- if (isRevisionNewer(context, r, changeRev)) {
- // we have seen a previous change from another cluster node
- // (which might be conflicting or not) - we need to make
- // sure this change is visible from now on
- // TODO verify this is really needed
- context.publishRevision(r, changeRev);
- }
if (!r.equals(changeRev)) {
- if (!isValidRevision(context, r, changeRev, new
HashSet<Revision>())) {
- handler.concurrentModification(r);
- } else {
+ if (isValidRevision(context, r, changeRev, new
HashSet<Revision>())) {
newestRev = r;
// found newest revision, no need to check more revisions
// revisions are sorted newest first
break;
+ } else {
+ handler.concurrentModification(r);
}
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Revision.java
Thu Nov 14 14:25:30 2013
@@ -380,13 +380,9 @@ public class Revision {
return;
}
if (last.revision.compareRevisionTime(r) > 0) {
- /*
throw new IllegalArgumentException(
"Can not add an earlier revision: " +
last.revision + " > " + r +
"; current cluster node is " +
currentClusterNodeId);
- */
- // quick fix for OAK-1167
- return;
}
newList = new ArrayList<RevisionRange>(list);
}
@@ -413,6 +409,9 @@ public class Revision {
}
Revision range1 = getRevisionSeen(o1);
Revision range2 = getRevisionSeen(o2);
+ if (range1 == FUTURE && range2 == FUTURE) {
+ return o1.compareRevisionTime(o2);
+ }
if (range1 == null || range2 == null) {
return o1.compareRevisionTime(o2);
}
@@ -438,6 +437,11 @@ public class Revision {
private Revision getRevisionSeen(Revision r) {
List<RevisionRange> list = map.get(r.getClusterId());
if (list == null) {
+ if (r.getClusterId() != currentClusterNodeId) {
+ // this is from a cluster node we did not see yet
+ // see also OAK-1170
+ return FUTURE;
+ }
return null;
}
// search from latest backward
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/RevisionContext.java
Thu Nov 14 14:25:30 2013
@@ -40,15 +40,6 @@ public interface RevisionContext {
Comparator<Revision> getRevisionComparator();
/**
- * Ensure the revision visible from now on, possibly by updating the head
- * revision, so that the changes that occurred are visible.
- *
- * @param foreignRevision the revision from another cluster node
- * @param changeRevision the local revision that is sorted after the
foreign revision
- */
- void publishRevision(Revision foreignRevision, Revision changeRevision);
-
- /**
* @return the cluster id of the local MongoMK instance.
*/
int getClusterId();
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterJoinTest.java
Thu Nov 14 14:25:30 2013
@@ -28,7 +28,6 @@ import static org.junit.Assert.assertEqu
*/
public class ClusterJoinTest extends AbstractMongoConnectionTest {
- @Ignore
@Test
public void nodeJoins() throws Exception {
String rev1, rev2, rev3;
@@ -77,7 +76,7 @@ public class ClusterJoinTest extends Abs
}
assertNodesExist(rev3, "/" + name);
}
- // must only see all nodes @rev3
+ // must now see all nodes @rev3
assertEquals(5L, obj.get(":childNodeCount"));
} finally {
mk2.dispose();
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java?rev=1541915&r1=1541914&r2=1541915&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ClusterTest.java
Thu Nov 14 14:25:30 2013
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.fail;
import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.mk.blobs.BlobStore;
import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
import org.junit.After;
import org.junit.Before;
@@ -119,34 +120,27 @@ public class ClusterTest {
public void openCloseOpen() {
MemoryDocumentStore ds = new MemoryDocumentStore();
MemoryBlobStore bs = new MemoryBlobStore();
- MongoMK.Builder builder;
-
- builder = new MongoMK.Builder();
- builder.setDocumentStore(ds).setBlobStore(bs);
- MongoMK mk1 = builder.setClusterId(1).open();
+
+ MongoMK mk1 = createMK(1, 0, ds, bs);
mk1.commit("/", "+\"a\": {}", null, null);
mk1.commit("/", "-\"a\"", null, null);
+ mk1.runBackgroundOperations();
- builder = new MongoMK.Builder();
- builder.setDocumentStore(ds).setBlobStore(bs);
- MongoMK mk2 = builder.setClusterId(2).open();
+ MongoMK mk2 = createMK(2, 0, ds, bs);
mk2.commit("/", "+\"a\": {}", null, null);
mk2.commit("/", "-\"a\"", null, null);
+ mk2.runBackgroundOperations();
- builder = new MongoMK.Builder();
- builder.setDocumentStore(ds).setBlobStore(bs);
- MongoMK mk3 = builder.setClusterId(3).open();
+ MongoMK mk3 = createMK(3, 0, ds, bs);
mk3.commit("/", "+\"a\": {}", null, null);
mk3.commit("/", "-\"a\"", null, null);
+ mk3.runBackgroundOperations();
- builder = new MongoMK.Builder();
- builder.setDocumentStore(ds).setBlobStore(bs);
- MongoMK mk4 = builder.setClusterId(4).open();
+ MongoMK mk4 = createMK(4, 0, ds, bs);
mk4.commit("/", "+\"a\": {}", null, null);
+ mk4.runBackgroundOperations();
- builder = new MongoMK.Builder();
- builder.setDocumentStore(ds).setBlobStore(bs);
- MongoMK mk5 = builder.setClusterId(5).open();
+ MongoMK mk5 = createMK(5, 0, ds, bs);
mk5.commit("/", "-\"a\"", null, null);
mk5.commit("/", "+\"a\": {}", null, null);
@@ -227,8 +221,8 @@ public class ClusterTest {
@Test
public void conflict() {
- MongoMK mk1 = createMK(1);
- MongoMK mk2 = createMK(2);
+ MongoMK mk1 = createMK(1, 0);
+ MongoMK mk2 = createMK(2, 0);
String m1r0 = mk1.getHeadRevision();
String m2r0 = mk2.getHeadRevision();
@@ -240,8 +234,10 @@ public class ClusterTest {
} catch (MicroKernelException e) {
// expected
}
- // now, after the conflict, both cluster nodes see the node
- // (before the conflict, this isn't necessarily the case for mk2)
+ mk1.runBackgroundOperations();
+ mk2.runBackgroundOperations();
+
+ // node becomes visible after running background operations
String n1 = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 10, null);
String n2 = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 10, null);
assertEquals(n1, n2);
@@ -319,10 +315,14 @@ public class ClusterTest {
}
private MongoMK createMK(int clusterId) {
- MongoMK.Builder builder = new MongoMK.Builder();
+ return createMK(clusterId, 10);
+ }
+
+ private MongoMK createMK(int clusterId, int asyncDelay) {
if (MONGO_DB) {
DB db = MongoUtils.getConnection().getDB();
- builder.setMongoDB(db);
+ return new MongoMK.Builder().setMongoDB(db)
+ .setClusterId(clusterId).setAsyncDelay(asyncDelay).open();
} else {
if (ds == null) {
ds = new MemoryDocumentStore();
@@ -330,10 +330,14 @@ public class ClusterTest {
if (bs == null) {
bs = new MemoryBlobStore();
}
- builder.setDocumentStore(ds).setBlobStore(bs);
+ return createMK(clusterId, asyncDelay, ds, bs);
}
- builder.setAsyncDelay(10);
- return builder.setClusterId(clusterId).open();
+ }
+
+ private MongoMK createMK(int clusterId, int asyncDelay,
+ DocumentStore ds, BlobStore bs) {
+ return new MongoMK.Builder().setDocumentStore(ds).setBlobStore(bs)
+ .setClusterId(clusterId).setAsyncDelay(asyncDelay).open();
}
}