Author: mreutegg
Date: Thu Jun 19 09:35:52 2014
New Revision: 1603784

URL: http://svn.apache.org/r1603784
Log:
OAK-1645: Route find queries to Mongo secondary in MongoDocumentStore

Merged r1594159-1594164,1594166-1594167,1594169,1594808 from trunk

Added:
    
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
      - copied, changed from r1594161, 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
Modified:
    jackrabbit/oak/branches/1.0/   (props changed)
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PropertyHistory.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
    
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java
    jackrabbit/oak/branches/1.0/oak-doc/   (props changed)
    jackrabbit/oak/branches/1.0/oak-doc/src/site/markdown/documentmk.md

Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
  Merged /jackrabbit/oak/trunk:r1594159-1594164,1594166-1594167,1594169,1594808

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
 Thu Jun 19 09:35:52 2014
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.io.InputStream;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -463,6 +464,7 @@ public class DocumentMK implements Micro
         private boolean useSimpleRevision;
         private long splitDocumentAgeMillis = 5 * 60 * 1000;
         private long offHeapCacheSize = -1;
+        private long maxReplicationLagMillis = TimeUnit.HOURS.toMillis(6);
         private boolean disableBranches;
         private Clock clock = Clock.SIMPLE;
         private Executor executor;
@@ -731,6 +733,15 @@ public class DocumentMK implements Micro
             return clock;
         }
 
+        public Builder setMaxReplicationLag(long duration, TimeUnit unit){
+            maxReplicationLagMillis = unit.toMillis(duration);
+            return this;
+        }
+
+        public long getMaxReplicationLagMillis() {
+            return maxReplicationLagMillis;
+        }
+
         public Builder disableBranches() {
             disableBranches = true;
             return this;

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
 Thu Jun 19 09:35:52 2014
@@ -148,6 +148,9 @@ public class DocumentNodeStoreService {
     public static final String PROP_BLOB_GC_MAX_AGE = "blobGcMaxAgeInSecs";
     private long blobGcMaxAgeInSecs = DEFAULT_BLOB_GC_MAX_AGE;
 
+    private static final long DEFAULT_MAX_REPLICATION_LAG = 
TimeUnit.HOURS.toSeconds(6);
+    public static final String PROP_REPLICATION_LAG = 
"maxReplicationLagInSecs";
+    private long maxReplicationLagInSecs = DEFAULT_MAX_REPLICATION_LAG;
 
     @Activate
     protected void activate(ComponentContext context, Map<String, ?> config) 
throws Exception {
@@ -155,6 +158,8 @@ public class DocumentNodeStoreService {
         this.whiteboard = new OsgiWhiteboard(context.getBundleContext());
         this.executor = new WhiteboardExecutor();
         executor.start(whiteboard);
+        this.maxReplicationLagInSecs = 
PropertiesUtil.toLong(config.get(PROP_REPLICATION_LAG),
+                DEFAULT_MAX_REPLICATION_LAG);
 
         if (blobStore == null &&
                 PropertiesUtil.toBoolean(prop(CUSTOM_BLOB_STORE), false)) {
@@ -187,8 +192,9 @@ public class DocumentNodeStoreService {
             // Take care around not logging the uri directly as it
             // might contain passwords
             String type = useMK ? "MK" : "NodeStore";
-            log.info("Starting Document{} with host={}, db={}, cache size 
(MB)={}, Off Heap Cache size (MB)={}, 'changes' collection size (MB)={}",
-                    type, mongoURI.getHosts(), db, cacheSize, offHeapCache, 
changesSize);
+            log.info("Starting Document{} with host={}, db={}, cache size 
(MB)={}, Off Heap Cache size (MB)={}, " +
+                            "'changes' collection size (MB)={}, 
maxReplicationLagInSecs={}",
+                    type, mongoURI.getHosts(), db, cacheSize, offHeapCache, 
changesSize, maxReplicationLagInSecs);
             log.info("Mongo Connection details {}", 
MongoConnection.toString(mongoURI.getOptions()));
         }
 
@@ -205,6 +211,7 @@ public class DocumentNodeStoreService {
             mkBuilder.setBlobStore(blobStore);
         }
 
+        mkBuilder.setMaxReplicationLag(maxReplicationLagInSecs, 
TimeUnit.SECONDS);
         mkBuilder.setMongoDB(mongoDB, changesSize);
         mkBuilder.setExecutor(executor);
         mk = mkBuilder.open();

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
 Thu Jun 19 09:35:52 2014
@@ -1080,7 +1080,7 @@ public final class NodeDocument extends 
             return Collections.emptyList();
         }
         if (revision == null) {
-            return new PropertyHistory(store, this, property);
+            return new PropertyHistory(this, property);
         } else {
             final String mainPath = getMainPath();
             // first try to lookup revision directly
@@ -1089,7 +1089,7 @@ public final class NodeDocument extends 
                 Revision r = entry.getKey();
                 int h = entry.getValue().height;
                 String prevId = Utils.getPreviousIdFor(mainPath, r, h);
-                NodeDocument prev = store.find(Collection.NODES, prevId);
+                NodeDocument prev = getPreviousDocument(prevId);
                 if (prev != null) {
                     if (prev.getValueMap(property).containsKey(revision)) {
                         return Collections.singleton(prev);
@@ -1118,6 +1118,12 @@ public final class NodeDocument extends 
         }
     }
 
+    NodeDocument getPreviousDocument(String prevId){
+        //Use the maxAge variant such that in case of Mongo call for
+        //previous doc are directed towards replicas first
+        return store.find(Collection.NODES, prevId, Integer.MAX_VALUE);
+    }
+
     @Nonnull
     Iterator<NodeDocument> getAllPreviousDocs() {
         if (getPreviousRanges().isEmpty()) {
@@ -1147,9 +1153,7 @@ public final class NodeDocument extends 
     private NodeDocument getPreviousDoc(Revision rev, Range range){
         int h = range.height;
         String prevId = Utils.getPreviousIdFor(getMainPath(), rev, h);
-        //TODO Use the maxAge variant such that in case of Mongo call for
-        //previous doc are directed towards replicas first
-        NodeDocument prev = store.find(Collection.NODES, prevId);
+        NodeDocument prev = getPreviousDocument(prevId);
         if (prev != null) {
             return prev;
         } else {

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PropertyHistory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PropertyHistory.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PropertyHistory.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PropertyHistory.java
 Thu Jun 19 09:35:52 2014
@@ -45,16 +45,13 @@ class PropertyHistory implements Iterabl
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PropertyHistory.class);
 
-    private final DocumentStore store;
     private final NodeDocument doc;
     private final String property;
     // path of the main document
     private final String mainPath;
 
-    public PropertyHistory(@Nonnull DocumentStore store,
-                           @Nonnull NodeDocument doc,
+    public PropertyHistory(@Nonnull NodeDocument doc,
                            @Nonnull String property) {
-        this.store = checkNotNull(store);
         this.doc = checkNotNull(doc);
         this.property = checkNotNull(property);
         this.mainPath = doc.getMainPath();
@@ -70,7 +67,7 @@ class PropertyHistory implements Iterabl
                 Revision r = input.getKey();
                 int h = input.getValue().height;
                 String prevId = Utils.getPreviousIdFor(mainPath, r, h);
-                NodeDocument prev = store.find(Collection.NODES, prevId);
+                NodeDocument prev = doc.getPreviousDocument(prevId);
                 if (prev == null) {
                     LOG.warn("Document with previous revisions not found: " + 
prevId);
                     return null;

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
 Thu Jun 19 09:35:52 2014
@@ -37,6 +37,7 @@ import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.QueryBuilder;
 
+import com.mongodb.ReadPreference;
 import org.apache.jackrabbit.oak.cache.CacheValue;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.CachedNodeDocument;
@@ -135,6 +136,7 @@ abstract class CacheInvalidator {
 
             // Fetch lastRev for each such node
             DBCursor cursor = nodes.find(query.get(), keys);
+            cursor.setReadPreference(ReadPreference.primary());
             result.queryCount++;
 
             for (DBObject obj : cursor) {
@@ -222,6 +224,7 @@ abstract class CacheInvalidator {
 
                         // Fetch lastRev and modCount for each such nodes
                         DBCursor cursor = nodes.find(query.get(), keys);
+                        cursor.setReadPreference(ReadPreference.primary());
                         LOG.debug(
                                 "Checking for changed nodes at level {} with 
{} paths",
                                 tn.level(), sameLevelNodes.size());

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java
 Thu Jun 19 09:35:52 2014
@@ -28,7 +28,6 @@ import com.mongodb.DBCollection;
 import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.QueryBuilder;
-import com.mongodb.ReadPreference;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.document.BlobCollector;
 import org.apache.jackrabbit.oak.plugins.document.Collection;
@@ -78,7 +77,8 @@ public class MongoBlobReferenceIterator 
                     .is(NodeDocument.HAS_BINARY_VAL)
                     .get();
             //TODO It currently prefers secondary. Would that be Ok?
-            cursor = 
getNodeCollection().find(query).setReadPreference(ReadPreference.secondaryPreferred());
+            cursor = getNodeCollection().find(query)
+                    
.setReadPreference(documentStore.getConfiguredReadPreference(Collection.NODES));
         }
     }
 

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
 Thu Jun 19 09:35:52 2014
@@ -36,9 +36,9 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.base.Splitter;
-
 import com.google.common.collect.Lists;
+import com.mongodb.MongoClientURI;
+import com.mongodb.ReadPreference;
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.cache.CacheValue;
@@ -58,6 +58,8 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocOffHeapCache;
 import org.apache.jackrabbit.oak.plugins.document.cache.OffHeapCache;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +76,6 @@ import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.MongoException;
 import com.mongodb.QueryBuilder;
-import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
 import com.mongodb.WriteResult;
 
@@ -89,6 +90,13 @@ public class MongoDocumentStore implemen
 
     private static final DBObject BY_ID_ASC = new BasicDBObject(Document.ID, 
1);
 
+    static enum DocumentReadPreference {
+        PRIMARY,
+        PREFER_PRIMARY,
+        PREFER_SECONDARY,
+        PREFER_SECONDARY_IF_OLD_ENOUGH
+    }
+
     public static final int IN_CLAUSE_BATCH_SIZE = 500;
 
     private final DBCollection nodes;
@@ -114,6 +122,10 @@ public class MongoDocumentStore implemen
      */
     private final Comparator<Revision> comparator = 
StableRevisionComparator.REVERSE;
 
+    private Clock clock = Clock.SIMPLE;
+
+    private final long maxReplicationLagMillis;
+
     private String lastReadWriteMode;
 
     public MongoDocumentStore(DB db, DocumentMK.Builder builder) {
@@ -125,6 +137,8 @@ public class MongoDocumentStore implemen
         settings = db.getCollection(
                 Collection.SETTINGS.toString());
 
+        maxReplicationLagMillis = builder.getMaxReplicationLagMillis();
+
         // indexes:
         // the _id field is the primary key, so we don't need to define it
         DBObject index = new BasicDBObject();
@@ -197,9 +211,7 @@ public class MongoDocumentStore implemen
                 .recordStats()
                 .build();
 
-        Cache<CacheValue, NodeDocument> cache =
-                new NodeDocOffHeapCache(primaryCache, listener, builder, this);
-        return cache;
+        return new NodeDocOffHeapCache(primaryCache, listener, builder, this);
     }
 
     private static long start() {
@@ -252,25 +264,32 @@ public class MongoDocumentStore implemen
 
     @Override
     public <T extends Document> T find(Collection<T> collection, String key) {
-        return find(collection, key, Integer.MAX_VALUE);
+        return find(collection, key, true, -1);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <T extends Document> T find(final Collection<T> collection,
                                        final String key,
                                        int maxCacheAge) {
+        return find(collection, key, false, maxCacheAge);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T extends Document> T find(final Collection<T> collection,
+                                       final String key,
+                                       boolean preferCached,
+                                       final int maxCacheAge) {
         if (collection != Collection.NODES) {
-            return findUncached(collection, key);
+            return findUncached(collection, key, 
DocumentReadPreference.PRIMARY);
         }
         CacheValue cacheKey = new StringValue(key);
         NodeDocument doc;
-        if (maxCacheAge > 0) {
+        if (maxCacheAge > 0 || preferCached) {
             // first try without lock
             doc = nodesCache.getIfPresent(cacheKey);
             if (doc != null) {
-                if (maxCacheAge == Integer.MAX_VALUE ||
-                        System.currentTimeMillis() - doc.getCreated() < 
maxCacheAge) {
+                if (preferCached ||
+                        getTime() - doc.getCreated() < maxCacheAge) {
                     if (doc == NodeDocument.NULL) {
                         return null;
                     }
@@ -288,17 +307,17 @@ public class MongoDocumentStore implemen
                     doc = nodesCache.get(cacheKey, new 
Callable<NodeDocument>() {
                         @Override
                         public NodeDocument call() throws Exception {
-                            NodeDocument doc = (NodeDocument) 
findUncached(collection, key);
+                            NodeDocument doc = (NodeDocument) 
findUncached(collection, key, getReadPreference(maxCacheAge));
                             if (doc == null) {
                                 doc = NodeDocument.NULL;
                             }
                             return doc;
                         }
                     });
-                    if (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE) {
+                    if (maxCacheAge == 0 || preferCached) {
                         break;
                     }
-                    if (System.currentTimeMillis() - doc.getCreated() < 
maxCacheAge) {
+                    if (getTime() - doc.getCreated() < maxCacheAge) {
                         break;
                     }
                     // too old: invalidate, try again
@@ -318,12 +337,30 @@ public class MongoDocumentStore implemen
     }
 
     @CheckForNull
-    <T extends Document> T findUncached(Collection<T> collection, String key) {
+    private <T extends Document> T findUncached(Collection<T> collection, 
String key, DocumentReadPreference docReadPref) {
         DBCollection dbCollection = getDBCollection(collection);
         long start = start();
         try {
-            DBObject obj = dbCollection.findOne(getByKeyQuery(key).get());
-            if (obj == null) {
+            ReadPreference readPreference = getMongoReadPreference(collection, 
Utils.getParentId(key), docReadPref);
+
+            if(readPreference.isSlaveOk()){
+                LOG.trace("Routing call to secondary for fetching [{}]", key);
+            }
+
+            DBObject obj = dbCollection.findOne(getByKeyQuery(key).get(), 
null, null, readPreference);
+
+            if (obj == null
+                    && readPreference.isSlaveOk()) {
+                //In case secondary read preference is used and node is not 
found
+                //then check with primary again as it might happen that node 
document has not been
+                //replicated. This is required for case like SplitDocument 
where the SplitDoc is fetched with
+                //maxCacheAge == Integer.MAX_VALUE which results in 
readPreference of secondary.
+                //In such a case we know that document with such an id must 
exist
+                //but possibly dut to replication lag it has not reached to 
secondary. So in that case read again
+                //from primary
+                obj = dbCollection.findOne(getByKeyQuery(key).get(), null, 
null, ReadPreference.primary());
+            }
+            if(obj == null){
                 return null;
             }
             T doc = convertFromDBObject(collection, obj);
@@ -366,6 +403,16 @@ public class MongoDocumentStore implemen
         long start = start();
         try {
             DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC);
+            String parentId = Utils.getParentIdFromLowerLimit(fromKey);
+            ReadPreference readPreference =
+                    getMongoReadPreference(collection, parentId, 
getDefaultReadPreference(collection));
+
+            if(readPreference.isSlaveOk()){
+                LOG.trace("Routing call to secondary for fetching children 
from [{}] to [{}]", fromKey, toKey);
+            }
+
+            cursor.setReadPreference(readPreference);
+
             List<T> list;
             try {
                 list = new ArrayList<T>();
@@ -646,6 +693,66 @@ public class MongoDocumentStore implemen
         }
     }
 
+    DocumentReadPreference getReadPreference(int maxCacheAge){
+        if(maxCacheAge >= 0 && maxCacheAge < maxReplicationLagMillis) {
+            return DocumentReadPreference.PRIMARY;
+        } else if(maxCacheAge == Integer.MAX_VALUE){
+            return DocumentReadPreference.PREFER_SECONDARY;
+        } else {
+           return DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH;
+        }
+    }
+
+    DocumentReadPreference getDefaultReadPreference(Collection col){
+        return col == Collection.NODES ? 
DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH : 
DocumentReadPreference.PRIMARY;
+    }
+
+    <T extends Document> ReadPreference getMongoReadPreference(Collection<T> 
collection,
+                                                               String parentId,
+                                                               
DocumentReadPreference preference) {
+        switch(preference){
+            case PRIMARY:
+                return ReadPreference.primary();
+            case PREFER_PRIMARY :
+                return ReadPreference.primaryPreferred();
+            case PREFER_SECONDARY :
+                return getConfiguredReadPreference(collection);
+            case PREFER_SECONDARY_IF_OLD_ENOUGH:
+                if(collection != Collection.NODES){
+                    return ReadPreference.primary();
+                }
+
+                //Default to primary preferred such that in case primary is 
being elected
+                //we can still read from secondary
+                //TODO REVIEW Would that be safe
+                ReadPreference readPreference = 
ReadPreference.primaryPreferred();
+                if (parentId != null) {
+                    long replicationSafeLimit = getTime() - 
maxReplicationLagMillis;
+                    NodeDocument cachedDoc = (NodeDocument) 
getIfCached(collection, parentId);
+                    if (cachedDoc != null && 
!cachedDoc.hasBeenModifiedSince(replicationSafeLimit)) {
+
+                        //If parent has been modified loooong time back then 
there children
+                        //would also have not be modified. In that case we can 
read from secondary
+                        readPreference = 
getConfiguredReadPreference(collection);
+                    }
+                }
+                return readPreference;
+            default:
+                throw new IllegalArgumentException("Unsupported usage " + 
preference);
+        }
+    }
+
+    /**
+     * Retrieves the ReadPreference specified for the Mongo DB in use 
irrespective of
+     * DBCollection. Depending on deployments the user can tweak the default 
references
+     * to read from secondary and in that also tag secondaries
+     *
+     * @return db level ReadPreference
+     */
+    ReadPreference getConfiguredReadPreference(Collection collection){
+        return getDBCollection(collection).getReadPreference();
+    }
+
     @CheckForNull
     <T extends Document> T convertFromDBObject(@Nonnull Collection<T> 
collection,
                                                @Nullable DBObject n) {
@@ -946,25 +1053,33 @@ public class MongoDocumentStore implemen
         }
         lastReadWriteMode = readWriteMode;
         try {
-            Map<String, String> map = Splitter.on(", 
").withKeyValueSeparator(":").split(readWriteMode);
-            String read = map.get("read");
-            if (read != null) {
-                ReadPreference readPref = ReadPreference.valueOf(read);
-                if (!readPref.equals(nodes.getReadPreference())) {
-                    nodes.setReadPreference(readPref);
-                    LOG.info("Using ReadPreference " + readPref);
-                }
-            }
-            String write = map.get("write");
-            if (write != null) {
-                WriteConcern writeConcern = WriteConcern.valueOf(write);
-                if (!writeConcern.equals(nodes.getWriteConcern())) {
-                    nodes.setWriteConcern(writeConcern);
-                    LOG.info("Using WriteConcern " + writeConcern);
-                }
+            String rwModeUri = readWriteMode;
+            if(!readWriteMode.startsWith("mongodb://")){
+                rwModeUri = String.format("mongodb://localhost/?%s", 
readWriteMode);
+            }
+            MongoClientURI uri = new MongoClientURI(rwModeUri);
+            ReadPreference readPref = uri.getOptions().getReadPreference();
+
+            if (!readPref.equals(nodes.getReadPreference())) {
+                nodes.setReadPreference(readPref);
+                LOG.info("Using ReadPreference {} ",readPref);
+            }
+
+            WriteConcern writeConcern = uri.getOptions().getWriteConcern();
+            if (!writeConcern.equals(nodes.getWriteConcern())) {
+                nodes.setWriteConcern(writeConcern);
+                LOG.info("Using WriteConcern " + writeConcern);
             }
         } catch (Exception e) {
             LOG.error("Error setting readWriteMode " + readWriteMode, e);
         }
     }
+
+    private long getTime() {
+        return clock.getTime();
+    }
+
+    void setClock(Clock clock) {
+        this.clock = clock;
+    }
 }
\ No newline at end of file

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
 Thu Jun 19 09:35:52 2014
@@ -66,8 +66,7 @@ public class MongoMissingLastRevSeeker e
         DBCursor cursor =
                 getNodeCollection().find(query)
                         .sort(sortFields)
-                        .setReadPreference(
-                                ReadPreference.secondaryPreferred());
+                        .setReadPreference(ReadPreference.primary());
         return CloseableIterable.wrap(transform(cursor, new Function<DBObject, 
NodeDocument>() {
             @Override
             public NodeDocument apply(DBObject input) {

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java
 Thu Jun 19 09:35:52 2014
@@ -116,7 +116,7 @@ public class MongoVersionGCSupport exten
         final BasicDBObject keys = new BasicDBObject(Document.ID, 1);
         List<String> ids;
         DBCursor cursor = getNodeCollection().find(query, keys)
-                .setReadPreference(ReadPreference.secondaryPreferred());
+                
.setReadPreference(store.getConfiguredReadPreference(Collection.NODES));
         try {
              ids = ImmutableList.copyOf(Iterables.transform(cursor, new 
Function<DBObject, String>() {
                  @Override

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
 Thu Jun 19 09:35:52 2014
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -61,14 +62,14 @@ public class Utils {
      * possibly be too large to be used for the primary key for the document
      * store.
      */
-    private static final int PATH_SHORT = Integer.getInteger("oak.pathShort", 
165);
+    static final int PATH_SHORT = Integer.getInteger("oak.pathShort", 165);
 
     /**
      * The maximum length of the parent path, in bytes. If the parent path is
      * longer, then the id of a document is no longer the path, but the hash of
      * the parent, and then the node name.
      */
-    private static final int PATH_LONG = Integer.getInteger("oak.pathLong", 
350);
+    static final int PATH_LONG = Integer.getInteger("oak.pathLong", 350);
 
     /**
      * The maximum size a node name, in bytes. This is only a problem for long 
path.
@@ -250,6 +251,31 @@ public class Utils {
         return depth + ":" + path;
     }
 
+    /**
+     * Returns the parent id for given id if possible
+     *
+     * <p>It would return null in following cases
+     * <ul>
+     *     <li>If id is from long path</li>
+     *     <li>If id is for root path</li>
+     * </ul>
+     *</p>
+     * @param id id for which parent id needs to be determined
+     * @return parent id. null if parent id cannot be determined
+     */
+    @CheckForNull
+    public static String getParentId(String id){
+        if(Utils.isIdFromLongPath(id)){
+            return null;
+        }
+        String path = Utils.getPathFromId(id);
+        if(PathUtils.denotesRoot(path)){
+            return null;
+        }
+        String parentPath = PathUtils.getParentPath(path);
+        return Utils.getIdFromPath(parentPath);
+    }
+
     public static boolean isLongPath(String path) {
         // the most common case: a short path
         // avoid calculating the parent path
@@ -354,6 +380,23 @@ public class Utils {
     }
 
     /**
+     * Returns parentId extracted from the fromKey. fromKey is usually 
constructed
+     * using Utils#getKeyLowerLimit
+     *
+     * @param fromKey key used as start key in queries
+     * @return parentId if possible.
+     */
+    @CheckForNull
+    public static String getParentIdFromLowerLimit(String fromKey){
+        //If key just ends with slash 2:/foo/ then append a fake
+        //name to create a proper id
+        if(fromKey.endsWith("/")){
+            fromKey = fromKey + "a";
+        }
+        return getParentId(fromKey);
+    }
+
+    /**
      * Returns <code>true</code> if a revision tagged with the given revision
      * should be considered committed, <code>false</code> otherwise. Committed
      * revisions have a tag, which equals 'c' or starts with 'c-'.

Copied: 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
 (from r1594161, 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java)
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java?p2=jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java&r1=1594161&r2=1603784&rev=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java
 Thu Jun 19 09:35:52 2014
@@ -43,6 +43,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.document.Collection.SETTINGS;
 import static 
org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore.DocumentReadPreference;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ReadPreferenceIT {
 
@@ -66,8 +67,7 @@ public class ReadPreferenceIT {
         replicationLag = TimeUnit.SECONDS.toMillis(10);
         MongoConnection mc = MongoUtils.getConnection();
         documentNodeStore = new DocumentMK.Builder()
-                .clock(clock)
-                .setMaxReplicationLag(replicationLag, TimeUnit.HOURS)
+                .setMaxReplicationLag(replicationLag, TimeUnit.MILLISECONDS)
                 .setMongoDB(mc.getDB())
                 .setClusterId(1)
                 .getNodeStore();
@@ -136,17 +136,32 @@ public class ReadPreferenceIT {
         nodeStore.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
 
         String id = Utils.getIdFromPath("/x/y");
+        String parentId = Utils.getParentId(id);
         mongoDS.invalidateCache(NODES,id);
 
         //For modifiedTime < replicationLag primary should be preferred
         assertEquals(ReadPreference.primaryPreferred(),
-                mongoDS.getMongoReadPreference(NODES,id, 
DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+                mongoDS.getMongoReadPreference(NODES,parentId, 
DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
 
         //Going into future to make parent /x old enough
         clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag);
+        mongoDS.setClock(clock);
 
         //For old modified nodes secondaries should be preferred
         assertEquals(testPref,
-                mongoDS.getMongoReadPreference(NODES,id, 
DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+                mongoDS.getMongoReadPreference(NODES, parentId, 
DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
+    }
+
+    @Test
+    public void testReadWriteMode() throws Exception{
+        assertEquals(ReadPreference.primary(), 
mongoDS.getConfiguredReadPreference(NODES));
+
+        
mongoDS.setReadWriteMode("readPreference=secondary&w=2&safe=true&j=true");
+
+        assertEquals(ReadPreference.secondary(), 
mongoDS.getDBCollection(NODES).getReadPreference());
+        assertEquals(2, 
mongoDS.getDBCollection(NODES).getWriteConcern().getW());
+        assertTrue(mongoDS.getDBCollection(NODES).getWriteConcern().getJ());
+
+        assertEquals(ReadPreference.secondary(), 
mongoDS.getConfiguredReadPreference(NODES));
     }
 }

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java
 Thu Jun 19 09:35:52 2014
@@ -16,11 +16,15 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.util;
 
+import com.google.common.base.Strings;
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link Utils}.
@@ -38,6 +42,23 @@ public class UtilsTest {
                 Utils.getPreviousIdFor("/a/b/c/d/e/f/g/h/i/j/k/l/m", r, 3));
     }
 
+    @Test
+    public void getParentIdFromLowerLimit() throws Exception{
+        
assertEquals("1:/foo",Utils.getParentIdFromLowerLimit(Utils.getKeyLowerLimit("/foo")));
+        assertEquals("1:/foo",Utils.getParentIdFromLowerLimit("2:/foo/bar"));
+    }
+
+    @Test
+    public void getParentId() throws Exception{
+        String longPath = PathUtils.concat("/"+Strings.repeat("p", 
Utils.PATH_LONG + 1), "foo");
+        assertTrue(Utils.isLongPath(longPath));
+
+        assertNull(Utils.getParentId(Utils.getIdFromPath(longPath)));
+
+        assertNull(Utils.getParentId(Utils.getIdFromPath("/")));
+        assertEquals("1:/foo",Utils.getParentId("2:/foo/bar"));
+    }
+
     @Ignore("Performance test")
     @Test
     public void performance_getPreviousIdFor() {

Propchange: jackrabbit/oak/branches/1.0/oak-doc/
------------------------------------------------------------------------------
  Merged /jackrabbit/oak/trunk/oak-doc:r1594808

Modified: jackrabbit/oak/branches/1.0/oak-doc/src/site/markdown/documentmk.md
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-doc/src/site/markdown/documentmk.md?rev=1603784&r1=1603783&r2=1603784&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-doc/src/site/markdown/documentmk.md 
(original)
+++ jackrabbit/oak/branches/1.0/oak-doc/src/site/markdown/documentmk.md Thu Jun 
19 09:35:52 2014
@@ -414,39 +414,45 @@ or a random uuid if this is not availabl
 The `info` contains the same info as a string, plus additionally the process id
 and the uuid.
 
-### Changing the Read Preference and Write Concern at Runtime
+### Specifying the Read Preference and Write Concern
+
+With `MongoDocumentStore` you can specify the the [read preference][1] and 
[write concern][2]. 
+This can be enabled in Oak via two modes. 
+
+Note that `MongoDocumentStore` might still use a pre defined read preference 
like primary 
+where ever required. So if for some code path like reading latest `_lastRev` 
of root node 
+its required that read is performed from primary (for consistency) then code 
would explicitly 
+use the readPreference primary for that operation. For all other operation 
Mongo Java Driver would
+use default settings where read preference is set to `Primary` and write 
concern is set to `Acknowledged`. 
+Via using one of the two modes below a user can tune the default settings as 
per its need
+
+#### Via Configuration
+
+In this mode the config is specified as part of the Mongo URI (See 
[configuration](osgi_config.html#document-node-store)). 
+So if a user wants that reads from secondaries should prefer secondary with 
tag _dc:ny,rack:1_ 
+otherwise they go to other secondary then he can specify that via following 
mongouri
+
+    
mongodb://example1.com,example2.com,example3.com/?readPreference=secondary&readPreferenceTags=dc:ny,rack:1&readPreferenceTags=dc:ny&readPreferenceTags=
 
+
+Refer to [Read Preference Options][3] and [Write Concern Options][4] for more 
details.  
+ 
+#### Changing at Runtime
 
 The read preference and write concern of all cluster nodes can be changed at 
runtime
 without having to restart the instances, by setting the property 
`readWriteMode` of
 this collection. All cluster nodes will pick up the change within one minute 
 (when they renew the lease of the cluster node id). This is a string property 
with the
-format `'read:<readPreference>, write:<writeConcern>'` (please note the space 
after 
-the comma, and no spaces before and after the colon). The following shell 
command will
+format `'readPreference=<preference>&w=<writeConcern>'` similar to the way it 
is used in mongouri. 
+Just that it does not include other option details. The following shell 
command will
 set the read preference to `primary` and the write concern to `majority` for 
all
 cluster nodes:
 
     > db.clusterNodes.update({}, 
-      {$set: {readWriteMode:'read:primary, write:majority'}}, 
+      {$set: {readWriteMode:'readPreference=primary&w=majority'}}, 
       {multi: true})    
 
-License
--------
-
-(see the top-level [LICENSE.txt](../LICENSE.txt) for full license details)
-
-Collective work: Copyright 2013 The Apache Software Foundation.
+[1]: http://docs.mongodb.org/manual/core/read-preference/
+[2]: http://docs.mongodb.org/manual/core/write-concern/
+[3]: 
http://docs.mongodb.org/manual/reference/connection-string/#read-preference-options
+[4]: 
http://docs.mongodb.org/manual/reference/connection-string/#write-concern-options
 
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.


Reply via email to