Author: reschke
Date: Fri Oct 10 14:08:50 2014
New Revision: 1630901

URL: http://svn.apache.org/r1630901
Log:
OAK-1941 - tune table model to persist base JSON + serialzed updates

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1630901&r1=1630900&r2=1630901&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
 Fri Oct 10 14:08:50 2014
@@ -67,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
+import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
@@ -135,13 +136,14 @@ import com.google.common.util.concurrent
  * <tr>
  * <th>DSIZE</th>
  * <td>bigint</td>
- * <td>the size of the document's JSON serialization (for debugging 
purposes)</td>
+ * <td>the approximate  size of the document's JSON serialization (for 
debugging purposes)</td>
  * </tr>
  * <tr>
  * <th>DATA</th>
  * <td>varchar(16384)</td>
  * <td>the document's JSON serialization (only used for small document sizes, 
in
- * which case BDATA (below) is not set</td>
+ * which case BDATA (below) is not set), or a sequence of JSON serialized 
update operations
+ * to be applied against the last full serialization</td>
  * </tr>
  * <tr>
  * <th>BDATA</th>
@@ -320,6 +322,7 @@ public class RDBDocumentStore implements
 
     private static final String MODIFIED = "_modified";
     private static final String MODCOUNT = "_modCount";
+    private static final String ID = "_id";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RDBDocumentStore.class);
 
@@ -350,7 +353,7 @@ public class RDBDocumentStore implements
             NodeDocument.HAS_BINARY_FLAG }));
 
     // set of properties not serialized to JSON
-    private static Set<String> COLUMNPROPERTIES = new 
HashSet<String>(Arrays.asList(new String[] { "_id", MODIFIED }));
+    private static Set<String> COLUMNPROPERTIES = new 
HashSet<String>(Arrays.asList(new String[] { ID, MODIFIED, MODCOUNT }));
 
     private void initialize(DataSource ds, DocumentMK.Builder builder, 
RDBOptions options) throws Exception {
 
@@ -592,7 +595,7 @@ public class RDBDocumentStore implements
                 int retries = maxRetries;
                 while (!success && retries > 0) {
                     long lastmodcount = (Long) oldDoc.get(MODCOUNT);
-                    success = updateDocument(collection, doc, lastmodcount);
+                    success = updateDocument(collection, doc, update, 
lastmodcount);
                     if (!success) {
                         retries -= 1;
                         oldDoc = readDocumentCached(collection, 
update.getId(), Integer.MAX_VALUE);
@@ -701,11 +704,27 @@ public class RDBDocumentStore implements
 
     private <T extends Document> T fromDBRow(Collection<T> collection, DBRow 
row) throws ParseException {
         T doc = collection.newDocument(this);
-        doc.put("_id", row.id);
-        doc.put("_modified", row.modified);
+        doc.put(ID, row.id);
+        doc.put(MODIFIED, row.modified);
+        doc.put(MODCOUNT, row.modcount);
 
-        Map<String, Object> obj = (Map<String, Object>) new 
JSONParser().parse(row.data);
-        for (Map.Entry<String, Object> entry : obj.entrySet()) {
+        JSONParser jp = new JSONParser();
+
+        Map<String, Object> baseData = null;
+        if (row.bdata != null && row.bdata.length != 0) {
+            baseData = (Map<String, Object>) jp.parse(fromBlobData(row.bdata));
+        }
+        // TODO figure out a faster way
+        JSONArray arr = (JSONArray) new JSONParser().parse("[" + row.data + 
"]");
+
+        int updatesStartAt = 0;
+        if (baseData == null) {
+            // if we do not have a blob, the first part of the string data is 
the base JSON
+            baseData = (Map<String, Object>)arr.get(0);
+            updatesStartAt = 1; 
+        }
+
+        for (Map.Entry<String, Object> entry : baseData.entrySet()) {
             String key = entry.getKey();
             Object value = entry.getValue();
             if (value == null) {
@@ -719,6 +738,79 @@ public class RDBDocumentStore implements
                 throw new RuntimeException("unexpected type: " + 
value.getClass());
             }
         }
+
+        for (int u = updatesStartAt; u < arr.size(); u++) {
+            Object ob = arr.get(u);
+            if (ob instanceof JSONArray) {
+                JSONArray update = (JSONArray)ob;
+                for (int o = 0; o < update.size(); o++) {
+                    JSONArray op = (JSONArray)update.get(o);
+                    String opcode = op.get(0).toString();
+                    String key = op.get(1).toString();
+                    Revision rev = null;
+                    Object value = null;
+                    if (op.size() == 3) {
+                        value = op.get(2);
+                    }
+                    else {
+                        rev = Revision.fromString(op.get(2).toString());
+                        value = op.get(3);
+                    }
+                    Object old = doc.get(key);
+
+                    if ("=".equals(opcode)) {
+                        if (rev == null) {
+                            doc.put(key, value);
+                        } else {
+                            @SuppressWarnings("unchecked")
+                            Map<Revision, Object> m = (Map<Revision, Object>) 
old;
+                            if (m == null) {
+                                m = new TreeMap<Revision, Object>(comparator);
+                                doc.put(key, m);
+                            }
+                            m.put(rev, value);
+                        }
+                    } else if ("*".equals(opcode)) {
+                        if (rev == null) {
+                            throw new RuntimeException("unexpected operation " 
+ op + "in: " + ob);
+                        } else {
+                            @SuppressWarnings("unchecked")
+                            Map<Revision, Object> m = (Map<Revision, Object>) 
old;
+                            if (m != null) {
+                                m.remove(rev);
+                            }
+                        }
+                    } else if ("+".equals(opcode)) {
+                        if (rev == null) {
+                            Long x = (Long) value;
+                            if (old == null) {
+                                old = 0L;
+                            }
+                            doc.put(key, ((Long) old) + x);
+                        } else {
+                            throw new RuntimeException("unexpected operation " 
+ op + "in: " + ob);
+                        }
+                    } else if ("M".equals(opcode)) {
+                        if (rev == null) {
+                            Comparable newValue = (Comparable) value;
+                            if (old == null || newValue.compareTo(old) > 0) {
+                                doc.put(key, value);
+                            }
+                        } else {
+                            throw new RuntimeException("unexpected operation " 
+ op + "in: " + ob);
+                        }
+                    } else {
+                        throw new RuntimeException("unexpected operation " + 
op + "in: " + ob);
+                    }
+                }
+            }
+            else if (ob.toString().equals("blob") && u == 0) {
+                // expected placeholder
+            }
+            else {
+                throw new RuntimeException("unexpected: " + ob);
+            }
+        }
         return doc;
     }
 
@@ -778,19 +870,40 @@ public class RDBDocumentStore implements
         }
     }
 
-    private <T extends Document> boolean updateDocument(@Nonnull Collection<T> 
collection, @Nonnull T document, Long oldmodcount) {
+    private <T extends Document> boolean updateDocument(@Nonnull Collection<T> 
collection, @Nonnull T document,
+            @Nonnull UpdateOp update, Long oldmodcount) {
         Connection connection = null;
         String tableName = getTable(collection);
         try {
             connection = getConnection();
-            String data = asString(document);
             Long modified = (Long) document.get(MODIFIED);
             Number flag = (Number) document.get(NodeDocument.HAS_BINARY_FLAG);
             Boolean hasBinary = flag == null ? false : flag.intValue() == 
NodeDocument.HAS_BINARY_VAL;
             Long modcount = (Long) document.get(MODCOUNT);
             Long cmodcount = (Long) 
document.get(NodeDocument.COLLISIONSMODCOUNT);
-            boolean success = dbUpdate(connection, tableName, 
document.getId(), modified, hasBinary, modcount, cmodcount, oldmodcount, data);
-            connection.commit();
+            boolean success = false;
+
+            // every 16th update is a full rewrite
+            if (isAppendableUpdate(update) && modcount % 16 != 0) {
+                String appendData = asString(update);
+                if (appendData.length() < datalimit) {
+                    try {
+                        success = dbAppendingUpdate(connection, tableName, 
document.getId(), modified, hasBinary, modcount,
+                                cmodcount, oldmodcount, asString(update));
+                        connection.commit();
+                    } catch (SQLException ex) {
+                        continueIfStringOverflow(ex);
+                        connection.rollback();
+                        success = false;
+                    }
+                }
+            }
+            if (! success) {
+                String data = asString(document);
+                success = dbUpdate(connection, tableName, document.getId(), 
modified, hasBinary, modcount, cmodcount,
+                        oldmodcount, data);
+                connection.commit();
+            }
             return success;
         } catch (SQLException ex) {
             try {
@@ -806,6 +919,68 @@ public class RDBDocumentStore implements
         }
     }
 
+    private static void continueIfStringOverflow(SQLException ex) throws 
SQLException {
+        String state = ex.getSQLState();
+        if ("22001".equals(state) /* everybody */|| ("72000".equals(state) && 
1489 == ex.getErrorCode()) /* Oracle */) {
+            // ok
+        } else {
+            throw (ex);
+        }
+    }
+
+
+    /* currently we use append for all updates, but this might change in the 
future */
+    private static boolean isAppendableUpdate(UpdateOp update) {
+        return true;
+    }
+
+    /**
+     * Serializes the changes in the {@link UpdateOp} into a JSON array; each 
entry is another
+     * JSON array holding operation, key, revision, and value.
+     */
+    private static String asString(UpdateOp update) {
+        StringBuilder sb = new StringBuilder("[");
+        boolean needComma = false;
+        for (Map.Entry<Key, Operation> change : 
update.getChanges().entrySet()) {
+            Operation op = change.getValue();
+            Key key = change.getKey();
+
+            // exclude properties that are serialized into special columns
+            if (MODCOUNT.equals(key.getName()) && null == key.getRevision()) 
continue;
+            if (MODIFIED.equals(key.getName()) && null == key.getRevision()) 
continue;
+            if (ID.equals(key.getName()) && null == key.getRevision()) 
continue;
+
+            // already checked
+            if (op.type == UpdateOp.Operation.Type.CONTAINS_MAP_ENTRY) 
continue;
+
+            if (needComma) {
+                sb.append(",");
+            }
+            sb.append("[");
+            if (op.type == UpdateOp.Operation.Type.INCREMENT) {
+                sb.append("\"+\",");
+            } else if (op.type == UpdateOp.Operation.Type.SET || op.type == 
UpdateOp.Operation.Type.SET_MAP_ENTRY) {
+                sb.append("\"=\",");
+            } else if (op.type == UpdateOp.Operation.Type.MAX) {
+                sb.append("\"M\",");
+            } else if (op.type == UpdateOp.Operation.Type.REMOVE_MAP_ENTRY) {
+                sb.append("\"*\",");
+            } else {
+                throw new RuntimeException("Can't serialize " + 
update.toString() + " for JSON append");
+            }
+            appendString(sb, key.getName());
+            sb.append(",");
+            if (key.getRevision() != null) {
+                appendString(sb, key.getRevision().toString());
+                sb.append(",");
+            }
+            appendValue(sb, op.value);
+            sb.append("]");
+            needComma = true;
+        }
+        return sb.append("]").toString();
+    }
+
     private <T extends Document> void insertDocuments(Collection<T> 
collection, List<T> documents) {
         Connection connection = null;
         String tableName = getTable(collection);
@@ -848,13 +1023,9 @@ public class RDBDocumentStore implements
 
     private static byte[] GZIPSIG = { 31, -117 };
 
-    private String getData(ResultSet rs, int stringIndex, int blobIndex) 
throws SQLException {
+    private static String fromBlobData(byte[] bdata) {
         try {
-            String data = rs.getString(stringIndex);
-            byte[] bdata = rs.getBytes(blobIndex);
-            if (bdata == null) {
-                return data;
-            } else if (bdata.length >= 2 && bdata[0] == GZIPSIG[0] && bdata[1] 
== GZIPSIG[1]) {
+            if (bdata.length >= 2 && bdata[0] == GZIPSIG[0] && bdata[1] == 
GZIPSIG[1]) {
                 // GZIP
                 ByteArrayInputStream bis = new ByteArrayInputStream(bdata);
                 GZIPInputStream gis = new GZIPInputStream(bis, 65536);
@@ -863,7 +1034,7 @@ public class RDBDocumentStore implements
                 return IOUtils.toString(bdata, "UTF-8");
             }
         } catch (IOException e) {
-            throw new SQLException(e);
+            throw new RuntimeException(e);
         }
     }
 
@@ -899,14 +1070,16 @@ public class RDBDocumentStore implements
 
     @CheckForNull
     private DBRow dbRead(Connection connection, String tableName, String id) 
throws SQLException {
-        PreparedStatement stmt = connection.prepareStatement("select MODIFIED, 
DATA, BDATA from " + tableName + " where ID = ?");
+        PreparedStatement stmt = connection.prepareStatement("select MODIFIED, 
MODCOUNT, DATA, BDATA from " + tableName + " where ID = ?");
         try {
             stmt.setString(1, id);
             ResultSet rs = stmt.executeQuery();
             if (rs.next()) {
                 long modified = rs.getLong(1);
-                String data = getData(rs, 2, 3);
-                return new DBRow(id, modified, data);
+                long modcount = rs.getLong(2);
+                String data = rs.getString(3);
+                byte[] bdata = rs.getBytes(4);
+                return new DBRow(id, modified, modcount, data, bdata);
             } else {
                 return null;
             }
@@ -927,7 +1100,7 @@ public class RDBDocumentStore implements
 
     private List<DBRow> dbQuery(Connection connection, String tableName, 
String minId, String maxId, String indexedProperty,
             long startValue, int limit) throws SQLException {
-        String t = "select ID, MODIFIED, DATA, BDATA from " + tableName + " 
where ID > ? and ID < ?";
+        String t = "select ID, MODIFIED, MODCOUNT, DATA, BDATA from " + 
tableName + " where ID > ? and ID < ?";
         if (indexedProperty != null) {
             if (MODIFIED.equals(indexedProperty)) {
                 t += " and MODIFIED >= ?";
@@ -959,8 +1132,10 @@ public class RDBDocumentStore implements
                     throw new DocumentStoreException("unexpected query result: 
'" + minId + "' < '" + id + "' < '" + maxId + "' - broken DB collation?");
                 }
                 long modified = rs.getLong(2);
-                String data = getData(rs, 3, 4);
-                result.add(new DBRow(id, modified, data));
+                long modcount = rs.getLong(3);
+                String data = rs.getString(4);
+                byte[] bdata = rs.getBytes(5);
+                result.add(new DBRow(id, modified, modcount, data, bdata));
             }
         } finally {
             stmt.close();
@@ -1006,6 +1181,36 @@ public class RDBDocumentStore implements
         }
     }
 
+    private boolean dbAppendingUpdate(Connection connection, String tableName, 
String id, Long modified, Boolean hasBinary, Long modcount, Long cmodcount, 
Long oldmodcount,
+            String appendData) throws SQLException {
+        String t = "update " + tableName + " set MODIFIED = ?, HASBINARY = ?, 
MODCOUNT = ?, CMODCOUNT = ?, DSIZE = DSIZE + ?, DATA = DATA || ? where ID = ?";
+        if (oldmodcount != null) {
+            t += " and MODCOUNT = ?";
+        }
+        PreparedStatement stmt = connection.prepareStatement(t);
+        try {
+            int si = 1;
+            stmt.setObject(si++, modified, Types.BIGINT);
+            stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
+            stmt.setObject(si++, modcount, Types.BIGINT);
+            stmt.setObject(si++, cmodcount == null ? 0 : cmodcount, 
Types.BIGINT);
+            stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
+            stmt.setString(si++, "," + appendData);
+            stmt.setString(si++, id);
+            if (oldmodcount != null) {
+                stmt.setObject(si++, oldmodcount, Types.BIGINT);
+            }
+            int result = stmt.executeUpdate();
+            if (result != 1) {
+                LOG.debug("DB append update failed for " + tableName + "/" + 
id + " with oldmodcount=" + oldmodcount);
+            }
+            return result == 1;
+        } 
+        finally {
+            stmt.close();
+        }
+    }
+
     private boolean dbInsert(Connection connection, String tableName, String 
id, Long modified, Boolean hasBinary, Long modcount,
             Long cmodcount, String data) throws SQLException {
         PreparedStatement stmt = connection.prepareStatement("insert into " + 
tableName
@@ -1237,12 +1442,15 @@ public class RDBDocumentStore implements
 
     private static class DBRow {
         public final String id, data;
-        public final long modified;
-
-        public DBRow(String id, long modified, String data) {
+        public final long modified, modcount;
+        public final byte[] bdata;
+        
+        public DBRow(String id, long modified, long modcount, String data, 
byte[] bdata) {
             this.id = id;
             this.modified = modified;
+            this.modcount = modcount;
             this.data = data;
+            this.bdata = bdata;
         }
     }
 

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1630901&r1=1630900&r2=1630901&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
 Fri Oct 10 14:08:50 2014
@@ -31,7 +31,10 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
@@ -393,6 +396,7 @@ public class BasicDocumentStoreTest exte
         long duration = 1000;
         long end = System.currentTimeMillis() + duration;
         long cnt = 0;
+        Set<Revision> expectedRevs = new HashSet<Revision>();
 
         String id = this.getClass().getName() + ".testUpdatePerf" + (growing ? 
"Growing" : "") + "-" + size;
         removeMe.add(id);
@@ -404,6 +408,9 @@ public class BasicDocumentStoreTest exte
                 Revision r = new Revision(System.currentTimeMillis(), (int) 
cnt, 1);
                 up.setMapEntry("foo", r, pval);
                 up.setMapEntry("_commitRoot", r, "1");
+                up.increment("c", 1);
+                up.max("max", System.currentTimeMillis());
+                expectedRevs.add(r);
             } else {
                 up.set("foo", pval);
             }
@@ -416,6 +423,13 @@ public class BasicDocumentStoreTest exte
             cnt += 1;
         }
 
+        if (growing) {
+            NodeDocument result = super.ds.find(Collection.NODES, id, 0);
+            Map<Revision, Object> m = (Map<Revision, Object>)result.get("foo");
+            assertEquals("number of revisions", expectedRevs.size(), m.size());
+            assertTrue(m.keySet().equals(expectedRevs));
+        }
+
         LOG.info("document updates with property of size " + size + (growing ? 
" (growing)" : "") + " for " + super.dsname
                 + " was " + cnt + " in " + duration + "ms (" + (cnt / 
(duration / 1000f)) + "/s)");
     }


Reply via email to