Author: reschke
Date: Thu Sep 27 12:25:22 2018
New Revision: 1842089

URL: http://svn.apache.org/viewvc?rev=1842089&view=rev
Log:
OAK-7748: DocumentStore: test (and optionally optimize) bulk update fallback 
logic

Modified:
    
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java

Modified: 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1842089&r1=1842088&r2=1842089&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 Thu Sep 27 12:25:22 2018
@@ -423,6 +423,12 @@ public class RDBDocumentStore implements
         for (UpdateOp updateOp : updateOps) {
             UpdateOp conflictedOp = operationsToCover.remove(updateOp.getId());
             if (conflictedOp != null) {
+                if (collection == Collection.NODES) {
+                    LOG.debug("update conflict on {}, invalidating cache and 
retrying...", updateOp.getId());
+                    nodesCache.invalidate(updateOp.getId());
+                } else {
+                    LOG.debug("update conflict on {}, retrying...", 
updateOp.getId());
+                }
                 results.put(conflictedOp, createOrUpdate(collection, 
updateOp));
             } else if (duplicates.contains(updateOp)) {
                 results.put(updateOp, createOrUpdate(collection, updateOp));
@@ -1621,10 +1627,17 @@ public class RDBDocumentStore implements
                             if (lastmodcount == newmodcount) {
                                 // cached copy did not change so it probably 
was
                                 // updated by a different instance, get a 
fresh one
+                                LOG.debug("suspect update from different 
instance (current modcount: {}), refetching: {}...",
+                                        newmodcount, update.getId());
                                 if (collection == Collection.NODES) {
                                     nodesCache.invalidate(update.getId());
                                 }
                                 oldDoc = readDocumentUncached(collection, 
update.getId(), null);
+                                if (oldDoc == null) {
+                                    LOG.debug("after refetch: {} is gone", 
update.getId());
+                                } else {
+                                    LOG.debug("after refetch: modcount for {} 
is {}", update.getId(), modcountOf(oldDoc));
+                                }
                             }
                         }
 

Modified: 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java?rev=1842089&r1=1842088&r2=1842089&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java
 Thu Sep 27 12:25:22 2018
@@ -31,12 +31,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
+import ch.qos.logback.classic.Level;
+
 public class BulkCreateOrUpdateClusterTest extends 
AbstractMultiDocumentStoreTest {
-    
+
     final Logger logger = LoggerFactory.getLogger(getClass());
 
     public BulkCreateOrUpdateClusterTest(DocumentStoreFixture dsf) {
@@ -202,4 +208,64 @@ public class BulkCreateOrUpdateClusterTe
         }
     }
 
+    @Test
+    public void testSimpleConflictHandling() {
+        LogCustomizer logCustomizer = 
LogCustomizer.forLogger(RDBDocumentStore.class.getName()).enable(Level.DEBUG)
+                .contains("invalidating cache and retrying").create();
+        logCustomizer.starting();
+
+        try {
+            String id1 = this.getClass().getName() + 
".testSimpleConflictHandling1";
+            String id2 = this.getClass().getName() + 
".testSimpleConflictHandling2";
+            String id3 = this.getClass().getName() + 
".testSimpleConflictHandling3";
+
+            removeMe.add(id1);
+            removeMe.add(id2);
+            removeMe.add(id3);
+
+            {
+                UpdateOp op1a = new UpdateOp(id1, true);
+                op1a.set("foo", 1);
+                UpdateOp op2a = new UpdateOp(id2, true);
+                op2a.set("foo", 1);
+                UpdateOp op3a = new UpdateOp(id3, true);
+                op3a.set("foo", 1);
+
+                List<NodeDocument> resulta = 
ds1.createOrUpdate(Collection.NODES, Lists.newArrayList(op1a, op2a, op3a));
+                assertEquals(3, resulta.size());
+            }
+
+            {
+                UpdateOp op2b = new UpdateOp(id2, false);
+                op2b.increment("foo", 1);
+                NodeDocument prev2 = ds2.createOrUpdate(Collection.NODES, 
op2b);
+                assertNotNull(prev2);
+                assertEquals(1L, ((Long)prev2.get("foo")).longValue());
+            }
+
+            {
+                UpdateOp op1c = new UpdateOp(id1, true);
+                op1c.increment("foo", 1);
+                UpdateOp op2c = new UpdateOp(id2, true);
+                op2c.increment("foo", 1);
+                UpdateOp op3c = new UpdateOp(id3, true);
+                op3c.increment("foo", 1);
+
+                List<NodeDocument> resultc = 
ds1.createOrUpdate(Collection.NODES, Lists.newArrayList(op1c, op2c, op3c));
+                assertEquals(3, resultc.size());
+                for (NodeDocument d : resultc) {
+                    Long fooval = (Long) d.get("foo");
+                    assertEquals((d.getId().equals(id2)) ? 2L : 1L, 
fooval.longValue());
+                }
+            }
+
+            if (ds1 instanceof RDBDocumentStore) {
+                // for RDB, verify that the cache invalidation was reached
+                assertEquals(1, logCustomizer.getLogs().size());
+            }
+        }
+        finally {
+            logCustomizer.finished();
+        }
+    }
 }


Reply via email to