Author: reschke
Date: Fri Oct 10 14:08:50 2014
New Revision: 1630901
URL: http://svn.apache.org/r1630901
Log:
OAK-1941 - tune table model to persist base JSON + serialzed updates
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1630901&r1=1630900&r2=1630901&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Fri Oct 10 14:08:50 2014
@@ -67,6 +67,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
+import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
@@ -135,13 +136,14 @@ import com.google.common.util.concurrent
* <tr>
* <th>DSIZE</th>
* <td>bigint</td>
- * <td>the size of the document's JSON serialization (for debugging
purposes)</td>
+ * <td>the approximate size of the document's JSON serialization (for
debugging purposes)</td>
* </tr>
* <tr>
* <th>DATA</th>
* <td>varchar(16384)</td>
* <td>the document's JSON serialization (only used for small document sizes,
in
- * which case BDATA (below) is not set</td>
+ * which case BDATA (below) is not set), or a sequence of JSON serialized
update operations
+ * to be applied against the last full serialization</td>
* </tr>
* <tr>
* <th>BDATA</th>
@@ -320,6 +322,7 @@ public class RDBDocumentStore implements
private static final String MODIFIED = "_modified";
private static final String MODCOUNT = "_modCount";
+ private static final String ID = "_id";
private static final Logger LOG =
LoggerFactory.getLogger(RDBDocumentStore.class);
@@ -350,7 +353,7 @@ public class RDBDocumentStore implements
NodeDocument.HAS_BINARY_FLAG }));
// set of properties not serialized to JSON
- private static Set<String> COLUMNPROPERTIES = new
HashSet<String>(Arrays.asList(new String[] { "_id", MODIFIED }));
+ private static Set<String> COLUMNPROPERTIES = new
HashSet<String>(Arrays.asList(new String[] { ID, MODIFIED, MODCOUNT }));
private void initialize(DataSource ds, DocumentMK.Builder builder,
RDBOptions options) throws Exception {
@@ -592,7 +595,7 @@ public class RDBDocumentStore implements
int retries = maxRetries;
while (!success && retries > 0) {
long lastmodcount = (Long) oldDoc.get(MODCOUNT);
- success = updateDocument(collection, doc, lastmodcount);
+ success = updateDocument(collection, doc, update,
lastmodcount);
if (!success) {
retries -= 1;
oldDoc = readDocumentCached(collection,
update.getId(), Integer.MAX_VALUE);
@@ -701,11 +704,27 @@ public class RDBDocumentStore implements
private <T extends Document> T fromDBRow(Collection<T> collection, DBRow
row) throws ParseException {
T doc = collection.newDocument(this);
- doc.put("_id", row.id);
- doc.put("_modified", row.modified);
+ doc.put(ID, row.id);
+ doc.put(MODIFIED, row.modified);
+ doc.put(MODCOUNT, row.modcount);
- Map<String, Object> obj = (Map<String, Object>) new
JSONParser().parse(row.data);
- for (Map.Entry<String, Object> entry : obj.entrySet()) {
+ JSONParser jp = new JSONParser();
+
+ Map<String, Object> baseData = null;
+ if (row.bdata != null && row.bdata.length != 0) {
+ baseData = (Map<String, Object>) jp.parse(fromBlobData(row.bdata));
+ }
+ // TODO figure out a faster way
+ JSONArray arr = (JSONArray) new JSONParser().parse("[" + row.data +
"]");
+
+ int updatesStartAt = 0;
+ if (baseData == null) {
+ // if we do not have a blob, the first part of the string data is
the base JSON
+ baseData = (Map<String, Object>)arr.get(0);
+ updatesStartAt = 1;
+ }
+
+ for (Map.Entry<String, Object> entry : baseData.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value == null) {
@@ -719,6 +738,79 @@ public class RDBDocumentStore implements
throw new RuntimeException("unexpected type: " +
value.getClass());
}
}
+
+ for (int u = updatesStartAt; u < arr.size(); u++) {
+ Object ob = arr.get(u);
+ if (ob instanceof JSONArray) {
+ JSONArray update = (JSONArray)ob;
+ for (int o = 0; o < update.size(); o++) {
+ JSONArray op = (JSONArray)update.get(o);
+ String opcode = op.get(0).toString();
+ String key = op.get(1).toString();
+ Revision rev = null;
+ Object value = null;
+ if (op.size() == 3) {
+ value = op.get(2);
+ }
+ else {
+ rev = Revision.fromString(op.get(2).toString());
+ value = op.get(3);
+ }
+ Object old = doc.get(key);
+
+ if ("=".equals(opcode)) {
+ if (rev == null) {
+ doc.put(key, value);
+ } else {
+ @SuppressWarnings("unchecked")
+ Map<Revision, Object> m = (Map<Revision, Object>)
old;
+ if (m == null) {
+ m = new TreeMap<Revision, Object>(comparator);
+ doc.put(key, m);
+ }
+ m.put(rev, value);
+ }
+ } else if ("*".equals(opcode)) {
+ if (rev == null) {
+ throw new RuntimeException("unexpected operation "
+ op + "in: " + ob);
+ } else {
+ @SuppressWarnings("unchecked")
+ Map<Revision, Object> m = (Map<Revision, Object>)
old;
+ if (m != null) {
+ m.remove(rev);
+ }
+ }
+ } else if ("+".equals(opcode)) {
+ if (rev == null) {
+ Long x = (Long) value;
+ if (old == null) {
+ old = 0L;
+ }
+ doc.put(key, ((Long) old) + x);
+ } else {
+ throw new RuntimeException("unexpected operation "
+ op + "in: " + ob);
+ }
+ } else if ("M".equals(opcode)) {
+ if (rev == null) {
+ Comparable newValue = (Comparable) value;
+ if (old == null || newValue.compareTo(old) > 0) {
+ doc.put(key, value);
+ }
+ } else {
+ throw new RuntimeException("unexpected operation "
+ op + "in: " + ob);
+ }
+ } else {
+ throw new RuntimeException("unexpected operation " +
op + "in: " + ob);
+ }
+ }
+ }
+ else if (ob.toString().equals("blob") && u == 0) {
+ // expected placeholder
+ }
+ else {
+ throw new RuntimeException("unexpected: " + ob);
+ }
+ }
return doc;
}
@@ -778,19 +870,40 @@ public class RDBDocumentStore implements
}
}
- private <T extends Document> boolean updateDocument(@Nonnull Collection<T>
collection, @Nonnull T document, Long oldmodcount) {
+ private <T extends Document> boolean updateDocument(@Nonnull Collection<T>
collection, @Nonnull T document,
+ @Nonnull UpdateOp update, Long oldmodcount) {
Connection connection = null;
String tableName = getTable(collection);
try {
connection = getConnection();
- String data = asString(document);
Long modified = (Long) document.get(MODIFIED);
Number flag = (Number) document.get(NodeDocument.HAS_BINARY_FLAG);
Boolean hasBinary = flag == null ? false : flag.intValue() ==
NodeDocument.HAS_BINARY_VAL;
Long modcount = (Long) document.get(MODCOUNT);
Long cmodcount = (Long)
document.get(NodeDocument.COLLISIONSMODCOUNT);
- boolean success = dbUpdate(connection, tableName,
document.getId(), modified, hasBinary, modcount, cmodcount, oldmodcount, data);
- connection.commit();
+ boolean success = false;
+
+ // every 16th update is a full rewrite
+ if (isAppendableUpdate(update) && modcount % 16 != 0) {
+ String appendData = asString(update);
+ if (appendData.length() < datalimit) {
+ try {
+ success = dbAppendingUpdate(connection, tableName,
document.getId(), modified, hasBinary, modcount,
+ cmodcount, oldmodcount, asString(update));
+ connection.commit();
+ } catch (SQLException ex) {
+ continueIfStringOverflow(ex);
+ connection.rollback();
+ success = false;
+ }
+ }
+ }
+ if (! success) {
+ String data = asString(document);
+ success = dbUpdate(connection, tableName, document.getId(),
modified, hasBinary, modcount, cmodcount,
+ oldmodcount, data);
+ connection.commit();
+ }
return success;
} catch (SQLException ex) {
try {
@@ -806,6 +919,68 @@ public class RDBDocumentStore implements
}
}
+ private static void continueIfStringOverflow(SQLException ex) throws
SQLException {
+ String state = ex.getSQLState();
+ if ("22001".equals(state) /* everybody */|| ("72000".equals(state) &&
1489 == ex.getErrorCode()) /* Oracle */) {
+ // ok
+ } else {
+ throw (ex);
+ }
+ }
+
+
+ /* currently we use append for all updates, but this might change in the
future */
+ private static boolean isAppendableUpdate(UpdateOp update) {
+ return true;
+ }
+
+ /**
+ * Serializes the changes in the {@link UpdateOp} into a JSON array; each
entry is another
+ * JSON array holding operation, key, revision, and value.
+ */
+ private static String asString(UpdateOp update) {
+ StringBuilder sb = new StringBuilder("[");
+ boolean needComma = false;
+ for (Map.Entry<Key, Operation> change :
update.getChanges().entrySet()) {
+ Operation op = change.getValue();
+ Key key = change.getKey();
+
+ // exclude properties that are serialized into special columns
+ if (MODCOUNT.equals(key.getName()) && null == key.getRevision())
continue;
+ if (MODIFIED.equals(key.getName()) && null == key.getRevision())
continue;
+ if (ID.equals(key.getName()) && null == key.getRevision())
continue;
+
+ // already checked
+ if (op.type == UpdateOp.Operation.Type.CONTAINS_MAP_ENTRY)
continue;
+
+ if (needComma) {
+ sb.append(",");
+ }
+ sb.append("[");
+ if (op.type == UpdateOp.Operation.Type.INCREMENT) {
+ sb.append("\"+\",");
+ } else if (op.type == UpdateOp.Operation.Type.SET || op.type ==
UpdateOp.Operation.Type.SET_MAP_ENTRY) {
+ sb.append("\"=\",");
+ } else if (op.type == UpdateOp.Operation.Type.MAX) {
+ sb.append("\"M\",");
+ } else if (op.type == UpdateOp.Operation.Type.REMOVE_MAP_ENTRY) {
+ sb.append("\"*\",");
+ } else {
+ throw new RuntimeException("Can't serialize " +
update.toString() + " for JSON append");
+ }
+ appendString(sb, key.getName());
+ sb.append(",");
+ if (key.getRevision() != null) {
+ appendString(sb, key.getRevision().toString());
+ sb.append(",");
+ }
+ appendValue(sb, op.value);
+ sb.append("]");
+ needComma = true;
+ }
+ return sb.append("]").toString();
+ }
+
private <T extends Document> void insertDocuments(Collection<T>
collection, List<T> documents) {
Connection connection = null;
String tableName = getTable(collection);
@@ -848,13 +1023,9 @@ public class RDBDocumentStore implements
private static byte[] GZIPSIG = { 31, -117 };
- private String getData(ResultSet rs, int stringIndex, int blobIndex)
throws SQLException {
+ private static String fromBlobData(byte[] bdata) {
try {
- String data = rs.getString(stringIndex);
- byte[] bdata = rs.getBytes(blobIndex);
- if (bdata == null) {
- return data;
- } else if (bdata.length >= 2 && bdata[0] == GZIPSIG[0] && bdata[1]
== GZIPSIG[1]) {
+ if (bdata.length >= 2 && bdata[0] == GZIPSIG[0] && bdata[1] ==
GZIPSIG[1]) {
// GZIP
ByteArrayInputStream bis = new ByteArrayInputStream(bdata);
GZIPInputStream gis = new GZIPInputStream(bis, 65536);
@@ -863,7 +1034,7 @@ public class RDBDocumentStore implements
return IOUtils.toString(bdata, "UTF-8");
}
} catch (IOException e) {
- throw new SQLException(e);
+ throw new RuntimeException(e);
}
}
@@ -899,14 +1070,16 @@ public class RDBDocumentStore implements
@CheckForNull
private DBRow dbRead(Connection connection, String tableName, String id)
throws SQLException {
- PreparedStatement stmt = connection.prepareStatement("select MODIFIED,
DATA, BDATA from " + tableName + " where ID = ?");
+ PreparedStatement stmt = connection.prepareStatement("select MODIFIED,
MODCOUNT, DATA, BDATA from " + tableName + " where ID = ?");
try {
stmt.setString(1, id);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
long modified = rs.getLong(1);
- String data = getData(rs, 2, 3);
- return new DBRow(id, modified, data);
+ long modcount = rs.getLong(2);
+ String data = rs.getString(3);
+ byte[] bdata = rs.getBytes(4);
+ return new DBRow(id, modified, modcount, data, bdata);
} else {
return null;
}
@@ -927,7 +1100,7 @@ public class RDBDocumentStore implements
private List<DBRow> dbQuery(Connection connection, String tableName,
String minId, String maxId, String indexedProperty,
long startValue, int limit) throws SQLException {
- String t = "select ID, MODIFIED, DATA, BDATA from " + tableName + "
where ID > ? and ID < ?";
+ String t = "select ID, MODIFIED, MODCOUNT, DATA, BDATA from " +
tableName + " where ID > ? and ID < ?";
if (indexedProperty != null) {
if (MODIFIED.equals(indexedProperty)) {
t += " and MODIFIED >= ?";
@@ -959,8 +1132,10 @@ public class RDBDocumentStore implements
throw new DocumentStoreException("unexpected query result:
'" + minId + "' < '" + id + "' < '" + maxId + "' - broken DB collation?");
}
long modified = rs.getLong(2);
- String data = getData(rs, 3, 4);
- result.add(new DBRow(id, modified, data));
+ long modcount = rs.getLong(3);
+ String data = rs.getString(4);
+ byte[] bdata = rs.getBytes(5);
+ result.add(new DBRow(id, modified, modcount, data, bdata));
}
} finally {
stmt.close();
@@ -1006,6 +1181,36 @@ public class RDBDocumentStore implements
}
}
+ private boolean dbAppendingUpdate(Connection connection, String tableName,
String id, Long modified, Boolean hasBinary, Long modcount, Long cmodcount,
Long oldmodcount,
+ String appendData) throws SQLException {
+ String t = "update " + tableName + " set MODIFIED = ?, HASBINARY = ?,
MODCOUNT = ?, CMODCOUNT = ?, DSIZE = DSIZE + ?, DATA = DATA || ? where ID = ?";
+ if (oldmodcount != null) {
+ t += " and MODCOUNT = ?";
+ }
+ PreparedStatement stmt = connection.prepareStatement(t);
+ try {
+ int si = 1;
+ stmt.setObject(si++, modified, Types.BIGINT);
+ stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, modcount, Types.BIGINT);
+ stmt.setObject(si++, cmodcount == null ? 0 : cmodcount,
Types.BIGINT);
+ stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
+ stmt.setString(si++, "," + appendData);
+ stmt.setString(si++, id);
+ if (oldmodcount != null) {
+ stmt.setObject(si++, oldmodcount, Types.BIGINT);
+ }
+ int result = stmt.executeUpdate();
+ if (result != 1) {
+ LOG.debug("DB append update failed for " + tableName + "/" +
id + " with oldmodcount=" + oldmodcount);
+ }
+ return result == 1;
+ }
+ finally {
+ stmt.close();
+ }
+ }
+
private boolean dbInsert(Connection connection, String tableName, String
id, Long modified, Boolean hasBinary, Long modcount,
Long cmodcount, String data) throws SQLException {
PreparedStatement stmt = connection.prepareStatement("insert into " +
tableName
@@ -1237,12 +1442,15 @@ public class RDBDocumentStore implements
private static class DBRow {
public final String id, data;
- public final long modified;
-
- public DBRow(String id, long modified, String data) {
+ public final long modified, modcount;
+ public final byte[] bdata;
+
+ public DBRow(String id, long modified, long modcount, String data,
byte[] bdata) {
this.id = id;
this.modified = modified;
+ this.modcount = modcount;
this.data = data;
+ this.bdata = bdata;
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1630901&r1=1630900&r2=1630901&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
Fri Oct 10 14:08:50 2014
@@ -31,7 +31,10 @@ import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
@@ -393,6 +396,7 @@ public class BasicDocumentStoreTest exte
long duration = 1000;
long end = System.currentTimeMillis() + duration;
long cnt = 0;
+ Set<Revision> expectedRevs = new HashSet<Revision>();
String id = this.getClass().getName() + ".testUpdatePerf" + (growing ?
"Growing" : "") + "-" + size;
removeMe.add(id);
@@ -404,6 +408,9 @@ public class BasicDocumentStoreTest exte
Revision r = new Revision(System.currentTimeMillis(), (int)
cnt, 1);
up.setMapEntry("foo", r, pval);
up.setMapEntry("_commitRoot", r, "1");
+ up.increment("c", 1);
+ up.max("max", System.currentTimeMillis());
+ expectedRevs.add(r);
} else {
up.set("foo", pval);
}
@@ -416,6 +423,13 @@ public class BasicDocumentStoreTest exte
cnt += 1;
}
+ if (growing) {
+ NodeDocument result = super.ds.find(Collection.NODES, id, 0);
+ Map<Revision, Object> m = (Map<Revision, Object>)result.get("foo");
+ assertEquals("number of revisions", expectedRevs.size(), m.size());
+ assertTrue(m.keySet().equals(expectedRevs));
+ }
+
LOG.info("document updates with property of size " + size + (growing ?
" (growing)" : "") + " for " + super.dsname
+ " was " + cnt + " in " + duration + "ms (" + (cnt /
(duration / 1000f)) + "/s)");
}