Author: mreutegg Date: Wed Jun 18 10:31:30 2014 New Revision: 1603401 URL: http://svn.apache.org/r1603401 Log: OAK-1897: Stale documents in MongoDocumentStore cache
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1603401&r1=1603400&r2=1603401&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Wed Jun 18 10:31:30 2014 @@ -29,6 +29,7 @@ import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -79,6 +80,8 @@ import com.mongodb.QueryBuilder; import com.mongodb.WriteConcern; import com.mongodb.WriteResult; +import static com.google.common.base.Preconditions.checkNotNull; + /** * A document store that uses MongoDB as the backend. */ @@ -114,7 +117,17 @@ public class MongoDocumentStore implemen /** * Locks to ensure cache consistency on reads, writes and invalidation. */ - private final Striped<Lock> locks = Striped.lock(64); + private final Striped<Lock> locks = Striped.lock(128); + + /** + * ReadWriteLocks to synchronize cache access when child documents are + * requested from MongoDB and put into the cache. Accessing a single + * document in the cache will acquire a read (shared) lock for the parent + * key in addition to the lock (from {@link #locks}) for the individual + * document. Reading multiple sibling documents will acquire a write + * (exclusive) lock for the parent key. See OAK-1897. + */ + private final Striped<ReadWriteLock> parentLocks = Striped.readWriteLock(64); /** * Comparator for maps with {@link Revision} keys. The maps are ordered @@ -246,7 +259,7 @@ public class MongoDocumentStore implemen @Override public <T extends Document> void invalidateCache(Collection<T> collection, String key) { if (collection == Collection.NODES) { - Lock lock = getAndLock(key); + TreeLock lock = acquire(key); try { nodesCache.invalidate(new StringValue(key)); } finally { @@ -298,7 +311,7 @@ public class MongoDocumentStore implemen } } try { - Lock lock = getAndLock(key); + TreeLock lock = acquire(key); try { if (maxCacheAge == 0) { invalidateCache(collection, key); @@ -400,10 +413,11 @@ public class MongoDocumentStore implemen queryBuilder.greaterThanEquals(startValue); } DBObject query = queryBuilder.get(); + String parentId = Utils.getParentIdFromLowerLimit(fromKey); + TreeLock lock = acquireExclusive(parentId != null ? parentId : ""); long start = start(); try { DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC); - String parentId = Utils.getParentIdFromLowerLimit(fromKey); ReadPreference readPreference = getMongoReadPreference(collection, parentId, getDefaultReadPreference(collection)); @@ -422,28 +436,23 @@ public class MongoDocumentStore implemen if (collection == Collection.NODES && doc != null) { doc.seal(); String id = doc.getId(); - Lock lock = getAndLock(id); CacheValue cacheKey = new StringValue(id); - try { - // do not overwrite document in cache if the - // existing one in the cache is newer - NodeDocument cached = nodesCache.getIfPresent(cacheKey); - if (cached != null && cached != NodeDocument.NULL) { - // check mod count - Number cachedModCount = cached.getModCount(); - Number modCount = doc.getModCount(); - if (cachedModCount == null || modCount == null) { - throw new IllegalStateException( - "Missing " + Document.MOD_COUNT); - } - if (modCount.longValue() > cachedModCount.longValue()) { - nodesCache.put(cacheKey, (NodeDocument) doc); - } - } else { + // do not overwrite document in cache if the + // existing one in the cache is newer + NodeDocument cached = nodesCache.getIfPresent(cacheKey); + if (cached != null && cached != NodeDocument.NULL) { + // check mod count + Number cachedModCount = cached.getModCount(); + Number modCount = doc.getModCount(); + if (cachedModCount == null || modCount == null) { + throw new IllegalStateException( + "Missing " + Document.MOD_COUNT); + } + if (modCount.longValue() > cachedModCount.longValue()) { nodesCache.put(cacheKey, (NodeDocument) doc); } - } finally { - lock.unlock(); + } else { + nodesCache.put(cacheKey, (NodeDocument) doc); } } list.add(doc); @@ -453,6 +462,7 @@ public class MongoDocumentStore implemen } return list; } finally { + lock.unlock(); end("query", start); } } @@ -497,7 +507,7 @@ public class MongoDocumentStore implemen updateOp = updateOp.copy(); DBObject update = createUpdate(updateOp); - Lock lock = getAndLock(updateOp.getId()); + TreeLock lock = acquire(updateOp.getId()); long start = start(); try { // get modCount of cached document @@ -630,7 +640,7 @@ public class MongoDocumentStore implemen } if (collection == Collection.NODES) { for (T doc : docs) { - Lock lock = getAndLock(doc.getId()); + TreeLock lock = acquire(doc.getId()); try { addToCache((NodeDocument) doc); } finally { @@ -670,19 +680,21 @@ public class MongoDocumentStore implemen if (writeResult.getError() != null) { throw new MicroKernelException("Update failed: " + writeResult.getError()); } - // update cache - for (Entry<String, NodeDocument> entry : cachedDocs.entrySet()) { - Lock lock = getAndLock(entry.getKey()); - try { - if (entry.getValue() == null) { - // make sure concurrently loaded document is invalidated - nodesCache.invalidate(new StringValue(entry.getKey())); - } else { - applyToCache(Collection.NODES, entry.getValue(), - updateOp.shallowCopy(entry.getKey())); + if (collection == Collection.NODES) { + // update cache + for (Entry<String, NodeDocument> entry : cachedDocs.entrySet()) { + TreeLock lock = acquire(entry.getKey()); + try { + if (entry.getValue() == null) { + // make sure concurrently loaded document is invalidated + nodesCache.invalidate(new StringValue(entry.getKey())); + } else { + applyToCache(Collection.NODES, entry.getValue(), + updateOp.shallowCopy(entry.getKey())); + } + } finally { + lock.unlock(); } - } finally { - lock.unlock(); } } } catch (MongoException e) { @@ -1040,10 +1052,42 @@ public class MongoDocumentStore implemen return update; } - private Lock getAndLock(String key) { - Lock l = locks.get(key); - l.lock(); - return l; + /** + * Returns the parent id for the given id. An empty String is returned if + * the given value is the id of the root document or the id for a long path. + * + * @param id an id for a document. + * @return the id of the parent document or the empty String. + */ + @Nonnull + private static String getParentId(@Nonnull String id) { + String parentId = Utils.getParentId(checkNotNull(id)); + if (parentId == null) { + parentId = ""; + } + return parentId; + } + + /** + * Acquires a log for the given key. The returned tree lock will also hold + * a shared lock on the parent key. + * + * @param key a key. + * @return the acquired lock for the given key. + */ + private TreeLock acquire(String key) { + return TreeLock.shared(parentLocks.get(getParentId(key)), locks.get(key)); + } + + /** + * Acquires an exclusive lock on the given parent key. Use this method to + * block cache access for child keys of the given parent key. + * + * @param parentKey the parent key. + * @return the acquired lock for the given parent key. + */ + private TreeLock acquireExclusive(String parentKey) { + return TreeLock.exclusive(parentLocks.get(parentKey)); } @Override @@ -1082,4 +1126,38 @@ public class MongoDocumentStore implemen void setClock(Clock clock) { this.clock = clock; } + + private final static class TreeLock { + + private final Lock parentLock; + private final Lock lock; + + private TreeLock(Lock parentLock, Lock lock) { + this.parentLock = parentLock; + this.lock = lock; + } + + static TreeLock shared(ReadWriteLock parentLock, Lock lock) { + return new TreeLock(parentLock.readLock(), lock).lock(); + } + + static TreeLock exclusive(ReadWriteLock parentLock) { + return new TreeLock(parentLock.writeLock(), null).lock(); + } + + private TreeLock lock() { + parentLock.lock(); + if (lock != null) { + lock.lock(); + } + return this; + } + + private void unlock() { + if (lock != null) { + lock.unlock(); + } + parentLock.unlock(); + } + } } \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1603401&r1=1603400&r2=1603401&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Wed Jun 18 10:31:30 2014 @@ -23,10 +23,8 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.sql.Timestamp; import java.util.Comparator; -import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -258,6 +256,7 @@ public class Utils { * <ul> * <li>If id is from long path</li> * <li>If id is for root path</li> + * <li>If id is for an invalid path</li> * </ul> *</p> * @param id id for which parent id needs to be determined @@ -269,6 +268,9 @@ public class Utils { return null; } String path = Utils.getPathFromId(id); + if (!PathUtils.isValid(path)) { + return null; + } if(PathUtils.denotesRoot(path)){ return null; } Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1603401&r1=1603400&r2=1603401&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java Wed Jun 18 10:31:30 2014 @@ -212,7 +212,7 @@ public class BasicDocumentStoreTest exte @Test public void testQueryCollation() { // create ten documents - String base = this.getClass().getName() + ".testQueryCollation"; + String base = "2:/" + this.getClass().getName() + ".testQueryCollation"; List<UpdateOp> creates = new ArrayList<UpdateOp>(); List<String>expected = new ArrayList<String>(); Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java?rev=1603401&r1=1603400&r2=1603401&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java (original) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java Wed Jun 18 10:31:30 2014 @@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -26,7 +27,6 @@ import org.apache.jackrabbit.oak.plugins import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.Maps; @@ -55,7 +55,6 @@ public class CacheConsistencyTest extend mk = builder.setDocumentStore(store).open(); } - @Ignore("OAK-1897") @Test public void cacheConsistency() throws Exception { mk.commit("/", "+\"node\":{}", null, null); @@ -82,8 +81,18 @@ public class CacheConsistencyTest extend Thread.sleep(10); } - // trigger write back of _lastRevs - mk.runBackgroundOperations(); + final Semaphore done = new Semaphore(0); + new Thread(new Runnable() { + @Override + public void run() { + // trigger write back of _lastRevs + mk.runBackgroundOperations(); + done.release(); + } + }).start(); + + // wait at most one second for background thread + done.tryAcquire(1, TimeUnit.SECONDS); // release thread store.semaphores.get(t).release();