Author: reschke
Date: Wed Jun 17 19:12:23 2015
New Revision: 1686097

URL: http://svn.apache.org/r1686097
Log:
OAK-2987: RDBDocumentStore: use batched prepared statements for inserts, 
increase test coverage

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1686097&r1=1686096&r2=1686097&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 Wed Jun 17 19:12:23 2015
@@ -1016,6 +1016,7 @@ public class RDBDocumentStore implements
     @CheckForNull
     private <T extends Document> boolean internalCreate(Collection<T> 
collection, List<UpdateOp> updates) {
         try {
+            boolean success = true;
             // try up to CHUNKSIZE ops in one transaction
             for (List<UpdateOp> chunks : Lists.partition(updates, CHUNKSIZE)) {
                 List<T> docs = new ArrayList<T>();
@@ -1032,12 +1033,17 @@ public class RDBDocumentStore implements
                     }
                     docs.add(doc);
                 }
-                insertDocuments(collection, docs);
-                for (T doc : docs) {
-                    addToCache(collection, doc);
+                boolean done = insertDocuments(collection, docs);
+                if (done) {
+                    for (T doc : docs) {
+                        addToCache(collection, doc);
+                    }
+                }
+                else {
+                    success = false;
                 }
             }
-            return true;
+            return success;
         } catch (DocumentStoreException ex) {
             return false;
         }
@@ -1442,26 +1448,15 @@ public class RDBDocumentStore implements
         return op == null ? 0L : Long.parseLong(op.value.toString());
     }
 
-    private <T extends Document> void insertDocuments(Collection<T> 
collection, List<T> documents) {
+    private <T extends Document> boolean insertDocuments(Collection<T> 
collection, List<T> documents) {
         Connection connection = null;
         String tableName = getTable(collection);
         List<String> ids = new ArrayList<String>();
         try {
             connection = this.ch.getRWConnection();
-            for (T document : documents) {
-                String data = SR.asString(document);
-                Long modified = (Long) document.get(MODIFIED);
-                Number flagB = (Number) 
document.get(NodeDocument.HAS_BINARY_FLAG);
-                Boolean hasBinary = flagB != null && flagB.intValue() == 
NodeDocument.HAS_BINARY_VAL;
-                Boolean flagD = (Boolean) 
document.get(NodeDocument.DELETED_ONCE);
-                Boolean deletedOnce = flagD != null && flagD.booleanValue();
-                Long modcount = (Long) document.get(MODCOUNT);
-                Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
-                String id = document.getId();
-                ids.add(id);
-                dbInsert(connection, tableName, id, modified, hasBinary, 
deletedOnce, modcount, cmodcount, data);
-            }
+            boolean result = dbInsert(connection, tableName, documents);
             connection.commit();
+            return result;
         } catch (SQLException ex) {
             LOG.debug("insert of " + ids + " failed", ex);
             this.ch.rollbackConnection(connection);
@@ -1810,32 +1805,48 @@ public class RDBDocumentStore implements
         }
     }
 
-    private boolean dbInsert(Connection connection, String tableName, String 
id, Long modified, Boolean hasBinary,
-            Boolean deletedOnce, Long modcount, Long cmodcount, String data) 
throws SQLException {
-        PreparedStatement stmt = connection.prepareStatement("insert into " + 
tableName
-                + "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT, 
DSIZE, DATA, BDATA) values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
+    private <T extends Document> boolean dbInsert(Connection connection, 
String tableName, List<T> documents) throws SQLException {
+
+        PreparedStatement stmt = connection.prepareStatement("insert into " + 
tableName +
+                "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT, 
DSIZE, DATA, BDATA) " +
+                "values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
+
         try {
-            int si = 1;
-            setIdInStatement(stmt, si++, id);
-            stmt.setObject(si++, modified, Types.BIGINT);
-            stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
-            stmt.setObject(si++, deletedOnce ? 1 : 0, Types.SMALLINT);
-            stmt.setObject(si++, modcount, Types.BIGINT);
-            stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : 
cmodcount, Types.BIGINT);
-            stmt.setObject(si++, data.length(), Types.BIGINT);
-            if (data.length() < this.dataLimitInOctets / CHAR2OCTETRATIO) {
-                stmt.setString(si++, data);
-                stmt.setBinaryStream(si++, null, 0);
-            } else {
-                stmt.setString(si++, "\"blob\"");
-                byte[] bytes = asBytes(data);
-                stmt.setBytes(si++, bytes);
-            }
-            int result = stmt.executeUpdate();
-            if (result != 1) {
-                LOG.debug("DB insert failed for " + tableName + "/" + id);
+            for (T document : documents) {
+                String data = SR.asString(document);
+                String id = document.getId();
+                Number hasBinary = (Number) 
document.get(NodeDocument.HAS_BINARY_FLAG);
+                Boolean deletedOnce = (Boolean) 
document.get(NodeDocument.DELETED_ONCE);
+                Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
+
+                int si = 1;
+                setIdInStatement(stmt, si++, id);
+                stmt.setObject(si++, document.get(MODIFIED), Types.BIGINT);
+                stmt.setObject(si++, (hasBinary != null && 
hasBinary.intValue() == NodeDocument.HAS_BINARY_VAL) ? 1 : 0, Types.SMALLINT);
+                stmt.setObject(si++, (deletedOnce != null && deletedOnce) ? 1 
: 0, Types.SMALLINT);
+                stmt.setObject(si++, document.get(MODCOUNT), Types.BIGINT);
+                stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : 
cmodcount, Types.BIGINT);
+                stmt.setObject(si++, data.length(), Types.BIGINT);
+                if (data.length() < this.dataLimitInOctets / CHAR2OCTETRATIO) {
+                    stmt.setString(si++, data);
+                    stmt.setBinaryStream(si++, null, 0);
+                } else {
+                    stmt.setString(si++, "\"blob\"");
+                    byte[] bytes = asBytes(data);
+                    stmt.setBytes(si++, bytes);
+                }
+                stmt.addBatch();
+            }
+            int[] results = stmt.executeBatch();
+            boolean success = true;
+            for (int i = 0; i < documents.size(); i++) {
+                int result = results[i];
+                if (result != 1 && result != Statement.SUCCESS_NO_INFO) {
+                    LOG.error("DB insert failed for {}: {}", tableName, 
documents.get(i).getId());
+                    success = false;
+                }
             }
-            return result == 1;
+            return success;
         } finally {
             stmt.close();
         }

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1686097&r1=1686096&r2=1686097&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
 Wed Jun 17 19:12:23 2015
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -224,6 +225,30 @@ public class BasicDocumentStoreTest exte
     }
 
     @Test
+    public void testCreatePartialFailure() {
+        String id = this.getClass().getName() + ".testCreatePartialFailure";
+
+        // remove if present
+        NodeDocument nd = super.ds.find(Collection.NODES, id);
+        if (nd != null) {
+            super.ds.remove(Collection.NODES, id);
+        }
+
+        // create a test node
+        UpdateOp up = new UpdateOp(id, true);
+        up.set("_id", id);
+        boolean success = super.ds.create(Collection.NODES, 
Collections.singletonList(up));
+        assertTrue(success);
+        removeMe.add(id);
+
+        // try to create two nodes, of which one exists
+        UpdateOp up2 = new UpdateOp(id + "2", true);
+        up2.set("_id", id + "2");
+        success = super.ds.create(Collection.NODES, Arrays.asList(new 
UpdateOp[] { up, up2 }));
+        assertFalse(success);
+    }
+
+    @Test
     public void testDeleteNonExisting() {
         String id = this.getClass().getName() + ".testDeleteNonExisting-" + 
UUID.randomUUID();
         // delete is best effort

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java?rev=1686097&r1=1686096&r2=1686097&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStorePerformanceTest.java
 Wed Jun 17 19:12:23 2015
@@ -55,6 +55,11 @@ public class DocumentStorePerformanceTes
     }
 
     @Test
+    public void testCreatePerfSmallBatch2() {
+        createPerf(16, 256);
+    }
+
+    @Test
     public void testCreatePerfBig() {
         createPerf(32 * 1024, 1);
     }
@@ -82,7 +87,8 @@ public class DocumentStorePerformanceTes
             cnt += 1;
         }
 
-        LOG.info("document creation with property of size " + size + " and 
batch size " + amount + " for " + super.dsname + " was " + cnt + " in " + 
duration + "ms (" + (cnt / (duration / 1000f)) + "/s)");
+        LOG.info("document creation with property of size " + size + " and 
batch size " + amount + " for " + super.dsname + " was "
+                + cnt + " in " + duration + "ms (" + (cnt / (duration / 
1000f)) + "/s)");
     }
 
     @Test


Reply via email to