Author: mreutegg
Date: Wed Jun 18 10:31:30 2014
New Revision: 1603401

URL: http://svn.apache.org/r1603401
Log:
OAK-1897: Stale documents in MongoDocumentStore cache

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1603401&r1=1603400&r2=1603401&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
 Wed Jun 18 10:31:30 2014
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -79,6 +80,8 @@ import com.mongodb.QueryBuilder;
 import com.mongodb.WriteConcern;
 import com.mongodb.WriteResult;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * A document store that uses MongoDB as the backend.
  */
@@ -114,7 +117,17 @@ public class MongoDocumentStore implemen
     /**
      * Locks to ensure cache consistency on reads, writes and invalidation.
      */
-    private final Striped<Lock> locks = Striped.lock(64);
+    private final Striped<Lock> locks = Striped.lock(128);
+
+    /**
+     * ReadWriteLocks to synchronize cache access when child documents are
+     * requested from MongoDB and put into the cache. Accessing a single
+     * document in the cache will acquire a read (shared) lock for the parent
+     * key in addition to the lock (from {@link #locks}) for the individual
+     * document. Reading multiple sibling documents will acquire a write
+     * (exclusive) lock for the parent key. See OAK-1897.
+     */
+    private final Striped<ReadWriteLock> parentLocks = 
Striped.readWriteLock(64);
 
     /**
      * Comparator for maps with {@link Revision} keys. The maps are ordered
@@ -246,7 +259,7 @@ public class MongoDocumentStore implemen
     @Override
     public <T extends Document> void invalidateCache(Collection<T> collection, 
String key) {
         if (collection == Collection.NODES) {
-            Lock lock = getAndLock(key);
+            TreeLock lock = acquire(key);
             try {
                 nodesCache.invalidate(new StringValue(key));
             } finally {
@@ -298,7 +311,7 @@ public class MongoDocumentStore implemen
             }
         }
         try {
-            Lock lock = getAndLock(key);
+            TreeLock lock = acquire(key);
             try {
                 if (maxCacheAge == 0) {
                     invalidateCache(collection, key);
@@ -400,10 +413,11 @@ public class MongoDocumentStore implemen
             queryBuilder.greaterThanEquals(startValue);
         }
         DBObject query = queryBuilder.get();
+        String parentId = Utils.getParentIdFromLowerLimit(fromKey);
+        TreeLock lock = acquireExclusive(parentId != null ? parentId : "");
         long start = start();
         try {
             DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC);
-            String parentId = Utils.getParentIdFromLowerLimit(fromKey);
             ReadPreference readPreference =
                     getMongoReadPreference(collection, parentId, 
getDefaultReadPreference(collection));
 
@@ -422,28 +436,23 @@ public class MongoDocumentStore implemen
                     if (collection == Collection.NODES && doc != null) {
                         doc.seal();
                         String id = doc.getId();
-                        Lock lock = getAndLock(id);
                         CacheValue cacheKey = new StringValue(id);
-                        try {
-                            // do not overwrite document in cache if the
-                            // existing one in the cache is newer
-                            NodeDocument cached = 
nodesCache.getIfPresent(cacheKey);
-                            if (cached != null && cached != NodeDocument.NULL) 
{
-                                // check mod count
-                                Number cachedModCount = cached.getModCount();
-                                Number modCount = doc.getModCount();
-                                if (cachedModCount == null || modCount == 
null) {
-                                    throw new IllegalStateException(
-                                            "Missing " + Document.MOD_COUNT);
-                                }
-                                if (modCount.longValue() > 
cachedModCount.longValue()) {
-                                    nodesCache.put(cacheKey, (NodeDocument) 
doc);
-                                }
-                            } else {
+                        // do not overwrite document in cache if the
+                        // existing one in the cache is newer
+                        NodeDocument cached = 
nodesCache.getIfPresent(cacheKey);
+                        if (cached != null && cached != NodeDocument.NULL) {
+                            // check mod count
+                            Number cachedModCount = cached.getModCount();
+                            Number modCount = doc.getModCount();
+                            if (cachedModCount == null || modCount == null) {
+                                throw new IllegalStateException(
+                                        "Missing " + Document.MOD_COUNT);
+                            }
+                            if (modCount.longValue() > 
cachedModCount.longValue()) {
                                 nodesCache.put(cacheKey, (NodeDocument) doc);
                             }
-                        } finally {
-                            lock.unlock();
+                        } else {
+                            nodesCache.put(cacheKey, (NodeDocument) doc);
                         }
                     }
                     list.add(doc);
@@ -453,6 +462,7 @@ public class MongoDocumentStore implemen
             }
             return list;
         } finally {
+            lock.unlock();
             end("query", start);
         }
     }
@@ -497,7 +507,7 @@ public class MongoDocumentStore implemen
         updateOp = updateOp.copy();
         DBObject update = createUpdate(updateOp);
 
-        Lock lock = getAndLock(updateOp.getId());
+        TreeLock lock = acquire(updateOp.getId());
         long start = start();
         try {
             // get modCount of cached document
@@ -630,7 +640,7 @@ public class MongoDocumentStore implemen
                 }
                 if (collection == Collection.NODES) {
                     for (T doc : docs) {
-                        Lock lock = getAndLock(doc.getId());
+                        TreeLock lock = acquire(doc.getId());
                         try {
                             addToCache((NodeDocument) doc);
                         } finally {
@@ -670,19 +680,21 @@ public class MongoDocumentStore implemen
                 if (writeResult.getError() != null) {
                     throw new MicroKernelException("Update failed: " + 
writeResult.getError());
                 }
-                // update cache
-                for (Entry<String, NodeDocument> entry : 
cachedDocs.entrySet()) {
-                    Lock lock = getAndLock(entry.getKey());
-                    try {
-                        if (entry.getValue() == null) {
-                            // make sure concurrently loaded document is 
invalidated
-                            nodesCache.invalidate(new 
StringValue(entry.getKey()));
-                        } else {
-                            applyToCache(Collection.NODES, entry.getValue(),
-                                    updateOp.shallowCopy(entry.getKey()));
+                if (collection == Collection.NODES) {
+                    // update cache
+                    for (Entry<String, NodeDocument> entry : 
cachedDocs.entrySet()) {
+                        TreeLock lock = acquire(entry.getKey());
+                        try {
+                            if (entry.getValue() == null) {
+                                // make sure concurrently loaded document is 
invalidated
+                                nodesCache.invalidate(new 
StringValue(entry.getKey()));
+                            } else {
+                                applyToCache(Collection.NODES, 
entry.getValue(),
+                                        updateOp.shallowCopy(entry.getKey()));
+                            }
+                        } finally {
+                            lock.unlock();
                         }
-                    } finally {
-                        lock.unlock();
                     }
                 }
             } catch (MongoException e) {
@@ -1040,10 +1052,42 @@ public class MongoDocumentStore implemen
         return update;
     }
 
-    private Lock getAndLock(String key) {
-        Lock l = locks.get(key);
-        l.lock();
-        return l;
+    /**
+     * Returns the parent id for the given id. An empty String is returned if
+     * the given value is the id of the root document or the id for a long 
path.
+     *
+     * @param id an id for a document.
+     * @return the id of the parent document or the empty String.
+     */
+    @Nonnull
+    private static String getParentId(@Nonnull String id) {
+        String parentId = Utils.getParentId(checkNotNull(id));
+        if (parentId == null) {
+            parentId = "";
+        }
+        return parentId;
+    }
+
+    /**
+     * Acquires a log for the given key. The returned tree lock will also hold
+     * a shared lock on the parent key.
+     *
+     * @param key a key.
+     * @return the acquired lock for the given key.
+     */
+    private TreeLock acquire(String key) {
+        return TreeLock.shared(parentLocks.get(getParentId(key)), 
locks.get(key));
+    }
+
+    /**
+     * Acquires an exclusive lock on the given parent key. Use this method to
+     * block cache access for child keys of the given parent key.
+     *
+     * @param parentKey the parent key.
+     * @return the acquired lock for the given parent key.
+     */
+    private TreeLock acquireExclusive(String parentKey) {
+        return TreeLock.exclusive(parentLocks.get(parentKey));
     }
 
     @Override
@@ -1082,4 +1126,38 @@ public class MongoDocumentStore implemen
     void setClock(Clock clock) {
         this.clock = clock;
     }
+
+    private final static class TreeLock {
+
+        private final Lock parentLock;
+        private final Lock lock;
+
+        private TreeLock(Lock parentLock, Lock lock) {
+            this.parentLock = parentLock;
+            this.lock = lock;
+        }
+
+        static TreeLock shared(ReadWriteLock parentLock, Lock lock) {
+            return new TreeLock(parentLock.readLock(), lock).lock();
+        }
+
+        static TreeLock exclusive(ReadWriteLock parentLock) {
+            return new TreeLock(parentLock.writeLock(), null).lock();
+        }
+
+        private TreeLock lock() {
+            parentLock.lock();
+            if (lock != null) {
+                lock.lock();
+            }
+            return this;
+        }
+
+        private void unlock() {
+            if (lock != null) {
+                lock.unlock();
+            }
+            parentLock.unlock();
+        }
+    }
 }
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1603401&r1=1603400&r2=1603401&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
 Wed Jun 18 10:31:30 2014
@@ -23,10 +23,8 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.sql.Timestamp;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -258,6 +256,7 @@ public class Utils {
      * <ul>
      *     <li>If id is from long path</li>
      *     <li>If id is for root path</li>
+     *     <li>If id is for an invalid path</li>
      * </ul>
      *</p>
      * @param id id for which parent id needs to be determined
@@ -269,6 +268,9 @@ public class Utils {
             return null;
         }
         String path = Utils.getPathFromId(id);
+        if (!PathUtils.isValid(path)) {
+            return null;
+        }
         if(PathUtils.denotesRoot(path)){
             return null;
         }

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1603401&r1=1603400&r2=1603401&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
 Wed Jun 18 10:31:30 2014
@@ -212,7 +212,7 @@ public class BasicDocumentStoreTest exte
     @Test
     public void testQueryCollation() {
         // create ten documents
-        String base = this.getClass().getName() + ".testQueryCollation";
+        String base = "2:/" + this.getClass().getName() + 
".testQueryCollation";
         List<UpdateOp> creates = new ArrayList<UpdateOp>();
 
         List<String>expected = new ArrayList<String>();

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java?rev=1603401&r1=1603400&r2=1603401&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CacheConsistencyTest.java
 Wed Jun 18 10:31:30 2014
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.Map;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -26,7 +27,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -55,7 +55,6 @@ public class CacheConsistencyTest extend
         mk = builder.setDocumentStore(store).open();
     }
 
-    @Ignore("OAK-1897")
     @Test
     public void cacheConsistency() throws Exception {
         mk.commit("/", "+\"node\":{}", null, null);
@@ -82,8 +81,18 @@ public class CacheConsistencyTest extend
             Thread.sleep(10);
         }
 
-        // trigger write back of _lastRevs
-        mk.runBackgroundOperations();
+        final Semaphore done = new Semaphore(0);
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                // trigger write back of _lastRevs
+                mk.runBackgroundOperations();
+                done.release();
+            }
+        }).start();
+
+        // wait at most one second for background thread
+        done.tryAcquire(1, TimeUnit.SECONDS);
 
         // release thread
         store.semaphores.get(t).release();


Reply via email to