Added:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1659616&view=auto
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
(added)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Fri Feb 13 17:22:26 2015
@@ -0,0 +1,1678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.rdb;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
+import java.util.zip.Deflater;
+import java.util.zip.GZIPOutputStream;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+import javax.sql.DataSource;
+
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.cache.CacheValue;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
+import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.cache.Cache;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
+
+/**
+ * Implementation of {@link CachingDocumentStore} for relational databases.
+ *
+ * <h3>Supported Databases</h3>
+ * <p>
+ * The code is supposed to be sufficiently generic to run with a variety of
+ * database implementations. However, the tables are created when required to
+ * simplify testing, and <em>that</em> code specifically supports these
+ * databases:
+ * <ul>
+ * <li>h2</li>
+ * <li>IBM DB2</li>
+ * <li>Postgres</li>
+ * <li>MariaDB (MySQL) (experimental)</li>
+ * <li>Oracle (experimental)</li>
+ * </ul>
+ *
+ * <h3>Table Layout</h3>
+ * <p>
+ * Data for each of the DocumentStore's {@link Collection}s is stored in its
own
+ * database table (with a name matching the collection).
+ * <p>
+ * The tables essentially implement key/value storage, where the key usually is
+ * derived from an Oak path, and the value is a serialization of a
+ * {@link Document} (or a part of one). Additional fields are used for queries,
+ * debugging, and concurrency control:
+ * <table style="text-align: left;">
+ * <thead>
+ * <tr>
+ * <th>Column</th>
+ * <th>Type</th>
+ * <th>Description</th>
+ * </tr>
+ * </thead> <tbody>
+ * <tr>
+ * <th>ID</th>
+ * <td>varchar(512) not null primary key</td>
+ * <td>the document's key (for databases that can not handle 512 character
+ * primary keys, such as MySQL, varbinary is possible as well; note that this
+ * currently needs to be hardcoded)</td>
+ * </tr>
+ * <tr>
+ * <th>MODIFIED</th>
+ * <td>bigint</td>
+ * <td>low-resolution timestamp
+ * </tr>
+ * <tr>
+ * <th>HASBINARY</th>
+ * <td>smallint</td>
+ * <td>flag indicating whether the document has binary properties
+ * </tr>
+ * <tr>
+ * <th>DELETEDONCE</th>
+ * <td>smallint</td>
+ * <td>flag indicating whether the document has been deleted once
+ * </tr>
+ * <tr>
+ * <th>MODCOUNT</th>
+ * <td>bigint</td>
+ * <td>modification counter, used for avoiding overlapping updates</td>
+ * </tr>
+ * <tr>
+ * <th>DSIZE</th>
+ * <td>bigint</td>
+ * <td>the approximate size of the document's JSON serialization (for debugging
+ * purposes)</td>
+ * </tr>
+ * <tr>
+ * <th>DATA</th>
+ * <td>varchar(16384)</td>
+ * <td>the document's JSON serialization (only used for small document sizes,
in
+ * which case BDATA (below) is not set), or a sequence of JSON serialized
update
+ * operations to be applied against the last full serialization</td>
+ * </tr>
+ * <tr>
+ * <th>BDATA</th>
+ * <td>blob</td>
+ * <td>the document's JSON serialization (usually GZIPped, only used for
"large"
+ * documents)</td>
+ * </tr>
+ * </tbody>
+ * </table>
+ * <p>
+ * The names of database tables can be prefixed; the purpose is mainly for
+ * testing, as tables can also be dropped automatically when the store is
+ * disposed (this only happens for those tables that have been created on
+ * demand)
+ * <p>
+ * <em>Note that the database needs to be created/configured to support all
Unicode
+ * characters in text fields, and to collate by Unicode code point (in DB2:
"collate using identity",
+ * in Postgres: "C").
+ * THIS IS NOT THE DEFAULT!</em>
+ * <p>
+ * <em>For MySQL, the database parameter "max_allowed_packet" needs to be
increased to support ~16 blobs.</em>
+ *
+ * <h3>Caching</h3>
+ * <p>
+ * The cache borrows heavily from the {@link MongoDocumentStore}
implementation;
+ * however it does not support the off-heap mechanism yet.
+ *
+ * <h3>Queries</h3>
+ * <p>
+ * The implementation currently supports only three indexed properties:
+ * "_bin", "deletedOnce", and "_modified". Attempts to use a different indexed
property will
+ * cause a {@link DocumentStoreException}.
+ */
+public class RDBDocumentStore implements DocumentStore {
+
+ /**
+ * Creates a {@linkplain RDBDocumentStore} instance using the provided
+ * {@link DataSource}, {@link DocumentMK.Builder}, and {@link RDBOptions}.
+ */
+ public RDBDocumentStore(DataSource ds, DocumentMK.Builder builder,
RDBOptions options) {
+ try {
+ initialize(ds, builder, options);
+ } catch (Exception ex) {
+ throw new DocumentStoreException("initializing RDB document
store", ex);
+ }
+ }
+
+ /**
+ * Creates a {@linkplain RDBDocumentStore} instance using the provided
+ * {@link DataSource}, {@link DocumentMK.Builder}, and default
+ * {@link RDBOptions}.
+ */
+ public RDBDocumentStore(DataSource ds, DocumentMK.Builder builder) {
+ this(ds, builder, new RDBOptions());
+ }
+
+ @Override
+ public <T extends Document> T find(Collection<T> collection, String id) {
+ return find(collection, id, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public <T extends Document> T find(final Collection<T> collection, final
String id, int maxCacheAge) {
+ return readDocumentCached(collection, id, maxCacheAge);
+ }
+
+ @Nonnull
+ @Override
+ public <T extends Document> List<T> query(Collection<T> collection, String
fromKey, String toKey, int limit) {
+ return query(collection, fromKey, toKey, null, 0, limit);
+ }
+
+ @Nonnull
+ @Override
+ public <T extends Document> List<T> query(Collection<T> collection, String
fromKey, String toKey, String indexedProperty,
+ long startValue, int limit) {
+ return internalQuery(collection, fromKey, toKey, indexedProperty,
startValue, limit);
+ }
+
+ @Override
+ public <T extends Document> void remove(Collection<T> collection, String
id) {
+ delete(collection, id);
+ invalidateCache(collection, id);
+ }
+
+ @Override
+ public <T extends Document> void remove(Collection<T> collection,
List<String> ids) {
+ for (String id : ids) {
+ invalidateCache(collection, id);
+ }
+ delete(collection, ids);
+ }
+
+ @Override
+ public <T extends Document> boolean create(Collection<T> collection,
List<UpdateOp> updateOps) {
+ return internalCreate(collection, updateOps);
+ }
+
+ @Override
+ public <T extends Document> void update(Collection<T> collection,
List<String> keys, UpdateOp updateOp) {
+ internalUpdate(collection, keys, updateOp);
+ }
+
+ @Override
+ public <T extends Document> T createOrUpdate(Collection<T> collection,
UpdateOp update) {
+ return internalCreateOrUpdate(collection, update, true, false);
+ }
+
+ @Override
+ public <T extends Document> T findAndUpdate(Collection<T> collection,
UpdateOp update) {
+ return internalCreateOrUpdate(collection, update, false, true);
+ }
+
+ @Override
+ public void invalidateCache() {
+ nodesCache.invalidateAll();
+ }
+
+ @Override
+ public <T extends Document> void invalidateCache(Collection<T> collection,
String id) {
+ if (collection == Collection.NODES) {
+ Lock lock = getAndLock(id);
+ try {
+ nodesCache.invalidate(new StringValue(id));
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ // used for diagnostics
+ private String droppedTables = "";
+
+ public String getDroppedTables() {
+ return this.droppedTables;
+ }
+
+ @Override
+ public void dispose() {
+ if (!this.tablesToBeDropped.isEmpty()) {
+ String dropped = "";
+ LOG.debug("attempting to drop: " + this.tablesToBeDropped);
+ for (String tname : this.tablesToBeDropped) {
+ Connection con = null;
+ try {
+ con = this.ch.getRWConnection();
+ try {
+ Statement stmt = con.createStatement();
+ stmt.execute("drop table " + tname);
+ stmt.close();
+ con.commit();
+ dropped += tname + " ";
+ } catch (SQLException ex) {
+ LOG.debug("attempting to drop: " + tname);
+ }
+ } catch (SQLException ex) {
+ LOG.debug("attempting to drop: " + tname);
+ } finally {
+ try {
+ if (con != null) {
+ con.close();
+ }
+ } catch (SQLException ex) {
+ LOG.debug("on close ", ex);
+ }
+ }
+ }
+ this.droppedTables = dropped.trim();
+ }
+ this.ch = null;
+ }
+
+ @Override
+ public <T extends Document> T getIfCached(Collection<T> collection, String
id) {
+ if (collection != Collection.NODES) {
+ return null;
+ } else {
+ NodeDocument doc = nodesCache.getIfPresent(new StringValue(id));
+ return castAsT(doc);
+ }
+ }
+
+ @Override
+ public CacheStats getCacheStats() {
+ return this.cacheStats;
+ }
+
+ // implementation
+
+ enum FETCHFIRSTSYNTAX { FETCHFIRST, LIMIT, TOP};
+
+ /**
+ * Defines variation in the capabilities of different RDBs.
+ */
+ enum DB {
+ DEFAULT("default") {
+ },
+
+ H2("H2"),
+
+ POSTGRES("PostgreSQL") {
+ @Override
+ public String getTableCreationStatement(String tableName) {
+ return ("create table " + tableName + " (ID varchar(512) not
null primary key, MODIFIED bigint, HASBINARY smallint, DELETEDONCE smallint,
MODCOUNT bigint, CMODCOUNT bigint, DSIZE bigint, DATA varchar(16384), BDATA
bytea)");
+ }
+ },
+
+ DB2("DB2"),
+
+ ORACLE("Oracle") {
+ @Override
+ public String getInitializationStatement() {
+ // see https://issues.apache.org/jira/browse/OAK-1914
+ // for some reason, the default for NLS_SORT is incorrect
+ return ("ALTER SESSION SET NLS_SORT='BINARY'");
+ }
+
+ @Override
+ public String getTableCreationStatement(String tableName) {
+ // see https://issues.apache.org/jira/browse/OAK-1914
+ return ("create table " + tableName + " (ID varchar(512) not
null primary key, MODIFIED number, HASBINARY number, DELETEDONCE number,
MODCOUNT number, CMODCOUNT number, DSIZE number, DATA varchar(4000), BDATA
blob)");
+ }
+ },
+
+ MYSQL("MySQL") {
+ @Override
+ public boolean isPrimaryColumnByteEncoded() {
+ // TODO: we should dynamically detect this
+ return true;
+ }
+
+ @Override
+ public String getTableCreationStatement(String tableName) {
+ // see https://issues.apache.org/jira/browse/OAK-1913
+ return ("create table " + tableName + " (ID varbinary(512) not
null primary key, MODIFIED bigint, HASBINARY smallint, DELETEDONCE smallint,
MODCOUNT bigint, CMODCOUNT bigint, DSIZE bigint, DATA varchar(16000), BDATA
longblob)");
+ }
+
+ @Override
+ public FETCHFIRSTSYNTAX getFetchFirstSyntax() {
+ return FETCHFIRSTSYNTAX.LIMIT;
+ }
+
+ @Override
+ public String getConcatQueryString(int dataOctetLimit, int
dataLength) {
+ return "CONCAT(DATA, ?)";
+ }
+ },
+
+ MSSQL("Microsoft SQL Server") {
+ @Override
+ public String getTableCreationStatement(String tableName) {
+ // see https://issues.apache.org/jira/browse/OAK-2395
+ return ("create table " + tableName + " (ID nvarchar(512) not
null primary key, MODIFIED bigint, HASBINARY smallint, DELETEDONCE smallint,
MODCOUNT bigint, CMODCOUNT bigint, DSIZE bigint, DATA nvarchar(4000), BDATA
varbinary(max))");
+ }
+
+ @Override
+ public FETCHFIRSTSYNTAX getFetchFirstSyntax() {
+ return FETCHFIRSTSYNTAX.TOP;
+ }
+
+ @Override
+ public String getConcatQueryString(int dataOctetLimit, int
dataLength) {
+ /*
+ * To avoid truncation when concatenating force an error when
+ * limit is above the octet limit
+ */
+ return "CASE WHEN LEN(DATA) <= " + (dataOctetLimit -
dataLength) + " THEN (DATA + CAST(? AS nvarchar("
+ + dataOctetLimit + "))) ELSE (DATA + CAST(DATA AS
nvarchar(max))) END";
+
+ }
+
+ @Override
+ public String getGreatestQueryString(String column) {
+ return "(select MAX(mod) from (VALUES (" + column + "), (?))
AS ALLMOD(mod))";
+ }
+ };
+
+ /**
+ * If the primary column is encoded in bytes.
+ * Default false
+ * @return boolean
+ */
+ public boolean isPrimaryColumnByteEncoded() {
+ return false;
+ }
+
+ /**
+ * Allows case in select. Default true.
+ */
+ public boolean allowsCaseInSelect() {
+ return true;
+ }
+
+ /**
+ * Query syntax for "FETCH FIRST"
+ */
+ public FETCHFIRSTSYNTAX getFetchFirstSyntax() {
+ return FETCHFIRSTSYNTAX.FETCHFIRST;
+ }
+
+ /**
+ * Returns the CONCAT function or its equivalent function or sub-query.
+ * Note that the function MUST NOT cause a truncated value to be
+ * written!
+ *
+ * @param dataOctetLimit
+ * expected capacity of data column
+ * @param dataLength
+ * length of string to be inserted
+ *
+ * @return the concat query string
+ */
+ public String getConcatQueryString(int dataOctetLimit, int dataLength)
{
+ return "DATA || CAST(? AS varchar(" + dataOctetLimit + "))";
+ }
+
+ /**
+ * Returns the GREATEST function or its equivalent function or
sub-query
+ * supported.
+ *
+ * @return the greatest query string
+ */
+ public String getGreatestQueryString(String column) {
+ return "GREATEST(" + column + ", ?)";
+ }
+
+ /**
+ * Query for any required initialization of the DB.
+ *
+ * @return the DB initialization SQL string
+ */
+ public @Nonnull String getInitializationStatement() {
+ return "";
+ }
+
+ /**
+ * Table creation statement string
+ *
+ * @param tableName
+ * @return the table creation string
+ */
+ public String getTableCreationStatement(String tableName) {
+ return "create table "
+ + tableName
+ + " (ID varchar(512) not null primary key, MODIFIED
bigint, HASBINARY smallint, DELETEDONCE smallint, MODCOUNT bigint, CMODCOUNT
bigint, DSIZE bigint, DATA varchar(16384), BDATA blob("
+ + 1024 * 1024 * 1024 + "))";
+ }
+
+ private String description;
+
+ private DB(String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return this.description;
+ }
+
+ @Nonnull
+ public static DB getValue(String desc) {
+ for (DB db : DB.values()) {
+ if (db.description.equals(desc)) {
+ return db;
+ } else if (db == DB2 && desc.startsWith("DB2/")) {
+ return db;
+ }
+ }
+
+ LOG.error("DB type " + desc + " unknown, trying default settings");
+ DEFAULT.description = desc + " - using default settings";
+ return DEFAULT;
+ }
+ }
+
+ private static final String MODIFIED = "_modified";
+ private static final String MODCOUNT = "_modCount";
+
+ /**
+ * Optional counter for changes to "_collisions" map ({@link
NodeDocument#COLLISIONS}).
+ */
+ private static final String COLLISIONSMODCOUNT = "_collisionsModCount";
+
+ private static final String ID = "_id";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RDBDocumentStore.class);
+
+ private final Comparator<Revision> comparator =
StableRevisionComparator.REVERSE;
+
+ private Exception callStack;
+
+ private RDBConnectionHandler ch;
+
+ // from options
+ private String tablePrefix = "";
+ private Set<String> tablesToBeDropped = new HashSet<String>();
+
+ // ratio between Java characters and UTF-8 encoding
+ // a) single characters will fit into 3 bytes
+ // b) a surrogate pair (two Java characters) will fit into 4 bytes
+ // thus...
+ private static final int CHAR2OCTETRATIO = 3;
+
+ // capacity of DATA column
+ private int dataLimitInOctets = 16384;
+
+ // number of retries for updates
+ private static final int RETRIES = 10;
+
+ // DB-specific information
+ private DB db;
+
+ // set of supported indexed properties
+ private static final Set<String> INDEXEDPROPERTIES = new
HashSet<String>(Arrays.asList(new String[] { MODIFIED,
+ NodeDocument.HAS_BINARY_FLAG, NodeDocument.DELETED_ONCE }));
+
+ // set of properties not serialized to JSON
+ private static final Set<String> COLUMNPROPERTIES = new
HashSet<String>(Arrays.asList(new String[] { ID,
+ NodeDocument.HAS_BINARY_FLAG, NodeDocument.DELETED_ONCE,
COLLISIONSMODCOUNT, MODIFIED, MODCOUNT }));
+
+ private final RDBDocumentSerializer SR = new RDBDocumentSerializer(this,
COLUMNPROPERTIES);
+
+ private void initialize(DataSource ds, DocumentMK.Builder builder,
RDBOptions options) throws Exception {
+
+ this.tablePrefix = options.getTablePrefix();
+ if (tablePrefix.length() > 0 && !tablePrefix.endsWith("_")) {
+ tablePrefix += "_";
+ }
+
+ this.ch = new RDBConnectionHandler(ds);
+ this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of
RDBDocumentStore creation") : null;
+
+ this.nodesCache = builder.buildDocumentCache(this);
+ this.cacheStats = new CacheStats(nodesCache, "Document-Documents",
builder.getWeigher(), builder.getDocumentCacheSize());
+
+ Connection con = this.ch.getRWConnection();
+ DatabaseMetaData md = con.getMetaData();
+ String dbDesc = md.getDatabaseProductName() + " " +
md.getDatabaseProductVersion();
+ String driverDesc = md.getDriverName() + " " + md.getDriverVersion();
+
+ this.db = DB.getValue(md.getDatabaseProductName());
+
+ if (! "".equals(db.getInitializationStatement())) {
+ Statement stmt = con.createStatement();
+ stmt.execute(db.getInitializationStatement());
+ stmt.close();
+ con.commit();
+ }
+
+ try {
+ createTableFor(con, Collection.CLUSTER_NODES,
options.isDropTablesOnClose());
+ createTableFor(con, Collection.NODES,
options.isDropTablesOnClose());
+ createTableFor(con, Collection.SETTINGS,
options.isDropTablesOnClose());
+ } finally {
+ con.commit();
+ con.close();
+ }
+
+ LOG.info("RDBDocumentStore instantiated for database " + dbDesc + ",
using driver: " + driverDesc);
+ }
+
+ private void createTableFor(Connection con, Collection<? extends Document>
col, boolean dropTablesOnClose) throws SQLException {
+ String tableName = getTable(col);
+ try {
+ PreparedStatement stmt = con.prepareStatement("select DATA from "
+ tableName + " where ID = ?");
+ stmt.setString(1, "0:/");
+ ResultSet rs = stmt.executeQuery();
+
+ if (col.equals(Collection.NODES)) {
+ // try to discover size of DATA column
+ ResultSetMetaData met = rs.getMetaData();
+ this.dataLimitInOctets = met.getPrecision(1);
+ }
+ } catch (SQLException ex) {
+ // table does not appear to exist
+ con.rollback();
+
+ LOG.info("Attempting to create table " + tableName + " in " +
this.db);
+
+ Statement stmt = con.createStatement();
+ stmt.execute(this.db.getTableCreationStatement(tableName));
+ stmt.close();
+
+ con.commit();
+
+ if (col.equals(Collection.NODES)) {
+ PreparedStatement pstmt = con.prepareStatement("select DATA
from " + tableName + " where ID = ?");
+ pstmt.setString(1, "0:/");
+ ResultSet rs = pstmt.executeQuery();
+ ResultSetMetaData met = rs.getMetaData();
+ this.dataLimitInOctets = met.getPrecision(1);
+ }
+
+ if (dropTablesOnClose) {
+ tablesToBeDropped.add(tableName);
+ }
+ }
+ }
+
+ @Override
+ public void finalize() {
+ if (this.ch != null && this.callStack != null) {
+ LOG.debug("finalizing RDBDocumentStore that was not disposed",
this.callStack);
+ }
+ }
+
+ private <T extends Document> T readDocumentCached(final Collection<T>
collection, final String id, int maxCacheAge) {
+ if (collection != Collection.NODES) {
+ return readDocumentUncached(collection, id, null);
+ } else {
+ CacheValue cacheKey = new StringValue(id);
+ NodeDocument doc = null;
+ if (maxCacheAge > 0) {
+ // first try without lock
+ doc = nodesCache.getIfPresent(cacheKey);
+ if (doc != null) {
+ if (maxCacheAge == Integer.MAX_VALUE ||
System.currentTimeMillis() - doc.getLastCheckTime() < maxCacheAge) {
+ return castAsT(unwrap(doc));
+ }
+ }
+ }
+ try {
+ Lock lock = getAndLock(id);
+ final NodeDocument cachedDoc = doc;
+ try {
+ if (maxCacheAge == 0) {
+ invalidateCache(collection, id);
+ }
+ while (true) {
+ doc = nodesCache.get(cacheKey, new
Callable<NodeDocument>() {
+ @Override
+ public NodeDocument call() throws Exception {
+ NodeDocument doc = (NodeDocument)
readDocumentUncached(collection, id, cachedDoc);
+ if (doc != null) {
+ doc.seal();
+ }
+ return wrap(doc);
+ }
+ });
+ if (maxCacheAge == 0 || maxCacheAge ==
Integer.MAX_VALUE) {
+ break;
+ }
+ if (System.currentTimeMillis() -
doc.getLastCheckTime() < maxCacheAge) {
+ break;
+ }
+ // too old: invalidate, try again
+ invalidateCache(collection, id);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return castAsT(unwrap(doc));
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("Failed to load document with
" + id, e);
+ }
+ }
+ }
+
+ @CheckForNull
+ private <T extends Document> boolean internalCreate(Collection<T>
collection, List<UpdateOp> updates) {
+ try {
+ // try up to CHUNKSIZE ops in one transaction
+ for (List<UpdateOp> chunks : Lists.partition(updates, CHUNKSIZE)) {
+ List<T> docs = new ArrayList<T>();
+ for (UpdateOp update : chunks) {
+ T doc = collection.newDocument(this);
+ update.increment(MODCOUNT, 1);
+ if (hasChangesToCollisions(update)) {
+ update.increment(COLLISIONSMODCOUNT, 1);
+ }
+ UpdateUtils.applyChanges(doc, update, comparator);
+ if (!update.getId().equals(doc.getId())) {
+ throw new DocumentStoreException("ID mismatch -
UpdateOp: " + update.getId() + ", ID property: "
+ + doc.getId());
+ }
+ docs.add(doc);
+ }
+ insertDocuments(collection, docs);
+ for (T doc : docs) {
+ addToCache(collection, doc);
+ }
+ }
+ return true;
+ } catch (DocumentStoreException ex) {
+ return false;
+ }
+ }
+
+ @CheckForNull
+ private <T extends Document> T internalCreateOrUpdate(Collection<T>
collection, UpdateOp update, boolean allowCreate,
+ boolean checkConditions) {
+ T oldDoc = readDocumentCached(collection, update.getId(),
Integer.MAX_VALUE);
+
+ if (oldDoc == null) {
+ if (!allowCreate) {
+ return null;
+ } else if (!update.isNew()) {
+ throw new DocumentStoreException("Document does not exist: " +
update.getId());
+ }
+ T doc = collection.newDocument(this);
+ if (checkConditions && !UpdateUtils.checkConditions(doc, update)) {
+ return null;
+ }
+ update.increment(MODCOUNT, 1);
+ if (hasChangesToCollisions(update)) {
+ update.increment(COLLISIONSMODCOUNT, 1);
+ }
+ UpdateUtils.applyChanges(doc, update, comparator);
+ try {
+ insertDocuments(collection, Collections.singletonList(doc));
+ addToCache(collection, doc);
+ return oldDoc;
+ } catch (DocumentStoreException ex) {
+ // may have failed due to a race condition; try update instead
+ // this is an edge case, so it's ok to bypass the cache
+ // (avoiding a race condition where the DB is already updated
+ // but the cache is not)
+ oldDoc = readDocumentUncached(collection, update.getId(),
null);
+ if (oldDoc == null) {
+ // something else went wrong
+ LOG.error("insert failed, but document " + update.getId()
+ " is not present, aborting", ex);
+ throw (ex);
+ }
+ return internalUpdate(collection, update, oldDoc,
checkConditions, RETRIES);
+ }
+ } else {
+ return internalUpdate(collection, update, oldDoc, checkConditions,
RETRIES);
+ }
+ }
+
+ /**
+ * @return previous version of document or <code>null</code>
+ */
+ @CheckForNull
+ private <T extends Document> T internalUpdate(Collection<T> collection,
UpdateOp update, T oldDoc, boolean checkConditions,
+ int maxRetries) {
+ T doc = applyChanges(collection, oldDoc, update, checkConditions);
+ if (doc == null) {
+ // conditions not met
+ return null;
+ } else {
+ Lock l = getAndLock(update.getId());
+ try {
+ boolean success = false;
+
+ int retries = maxRetries;
+ while (!success && retries > 0) {
+ long lastmodcount = (Long) oldDoc.get(MODCOUNT);
+ success = updateDocument(collection, doc, update,
lastmodcount);
+ if (!success) {
+ retries -= 1;
+ oldDoc = readDocumentCached(collection,
update.getId(), Integer.MAX_VALUE);
+ if (oldDoc != null) {
+ long newmodcount = (Long) oldDoc.get(MODCOUNT);
+ if (lastmodcount == newmodcount) {
+ // cached copy did not change so it probably
was
+ // updated by a different instance, get a
fresh one
+ oldDoc = readDocumentUncached(collection,
update.getId(), null);
+ }
+ }
+
+ if (oldDoc == null) {
+ // document was there but is now gone
+ LOG.error("failed to apply update because document
is gone in the meantime: " + update.getId());
+ return null;
+ }
+
+ doc = applyChanges(collection, oldDoc, update,
checkConditions);
+ if (doc == null) {
+ return null;
+ }
+ } else {
+ if (collection == Collection.NODES) {
+ applyToCache((NodeDocument) oldDoc, (NodeDocument)
doc);
+ }
+ }
+ }
+
+ if (!success) {
+ throw new DocumentStoreException("failed update of " +
doc.getId() + " (race?) after " + maxRetries
+ + " retries");
+ }
+
+ return oldDoc;
+ } finally {
+ l.unlock();
+ }
+ }
+ }
+
+ @CheckForNull
+ private <T extends Document> T applyChanges(Collection<T> collection, T
oldDoc, UpdateOp update, boolean checkConditions) {
+ T doc = collection.newDocument(this);
+ oldDoc.deepCopy(doc);
+ if (checkConditions && !UpdateUtils.checkConditions(doc, update)) {
+ return null;
+ }
+ if (hasChangesToCollisions(update)) {
+ update.increment(COLLISIONSMODCOUNT, 1);
+ }
+ update.increment(MODCOUNT, 1);
+ UpdateUtils.applyChanges(doc, update, comparator);
+ doc.seal();
+ return doc;
+ }
+
+ @CheckForNull
+ private <T extends Document> void internalUpdate(Collection<T> collection,
List<String> ids, UpdateOp update) {
+
+ if (isAppendableUpdate(update) && !requiresPreviousState(update)) {
+ long modified = getModifiedFromUpdate(update);
+ String appendData = SR.asString(update);
+
+ for (List<String> chunkedIds : Lists.partition(ids, CHUNKSIZE)) {
+ // remember what we already have in the cache
+ Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
+ if (collection == Collection.NODES) {
+ cachedDocs = new HashMap<String, NodeDocument>();
+ for (String key : chunkedIds) {
+ cachedDocs.put(key, nodesCache.getIfPresent(new
StringValue(key)));
+ }
+ }
+
+ Connection connection = null;
+ String tableName = getTable(collection);
+ boolean success = false;
+ try {
+ connection = this.ch.getRWConnection();
+ success = dbBatchedAppendingUpdate(connection, tableName,
chunkedIds, modified, appendData);
+ connection.commit();
+ } catch (SQLException ex) {
+ success = false;
+ this.ch.rollbackConnection(connection);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ if (success) {
+ for (Entry<String, NodeDocument> entry :
cachedDocs.entrySet()) {
+ if (entry.getValue() == null) {
+ // make sure concurrently loaded document is
+ // invalidated
+ nodesCache.invalidate(new
StringValue(entry.getKey()));
+ } else {
+ T oldDoc = (T) (entry.getValue());
+ T newDoc = applyChanges(collection, (T)
(entry.getValue()), update, true);
+ applyToCache((NodeDocument) oldDoc, (NodeDocument)
newDoc);
+ }
+ }
+ } else {
+ for (String id : chunkedIds) {
+ UpdateOp up = update.copy();
+ up = up.shallowCopy(id);
+ internalCreateOrUpdate(collection, up, false, true);
+ }
+ }
+ }
+ } else {
+ for (String id : ids) {
+ UpdateOp up = update.copy();
+ up = up.shallowCopy(id);
+ internalCreateOrUpdate(collection, up, false, true);
+ }
+ }
+ }
+
+ private <T extends Document> List<T> internalQuery(Collection<T>
collection, String fromKey, String toKey,
+ String indexedProperty, long startValue, int limit) {
+ Connection connection = null;
+ String tableName = getTable(collection);
+ List<T> result = new ArrayList<T>();
+ if (indexedProperty != null &&
(!INDEXEDPROPERTIES.contains(indexedProperty))) {
+ String message = "indexed property " + indexedProperty + " not
supported, query was '>= '" + startValue
+ + "'; supported properties are " + INDEXEDPROPERTIES;
+ LOG.info(message);
+ throw new DocumentStoreException(message);
+ }
+ try {
+ long now = System.currentTimeMillis();
+ connection = this.ch.getROConnection();
+ List<RDBRow> dbresult = dbQuery(connection, tableName, fromKey,
toKey, indexedProperty, startValue, limit);
+ connection.commit();
+ for (RDBRow r : dbresult) {
+ T doc = runThroughCache(collection, r, now);
+ result.add(doc);
+ }
+ } catch (Exception ex) {
+ LOG.error("SQL exception on query", ex);
+ throw new DocumentStoreException(ex);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ return result;
+ }
+
+ private <T extends Document> String getTable(Collection<T> collection) {
+ if (collection == Collection.CLUSTER_NODES) {
+ return this.tablePrefix + "CLUSTERNODES";
+ } else if (collection == Collection.NODES) {
+ return this.tablePrefix + "NODES";
+ } else if (collection == Collection.SETTINGS) {
+ return this.tablePrefix + "SETTINGS";
+ } else {
+ throw new IllegalArgumentException("Unknown collection: " +
collection.toString());
+ }
+ }
+
+ @CheckForNull
+ private <T extends Document> T readDocumentUncached(Collection<T>
collection, String id, NodeDocument cachedDoc) {
+ Connection connection = null;
+ String tableName = getTable(collection);
+ try {
+ long lastmodcount = -1;
+ if (cachedDoc != null && cachedDoc.getModCount() != null) {
+ lastmodcount = cachedDoc.getModCount().longValue();
+ }
+ connection = this.ch.getROConnection();
+ RDBRow row = dbRead(connection, tableName, id, lastmodcount);
+ connection.commit();
+ if (row == null) {
+ return null;
+ } else {
+ if (lastmodcount == row.getModcount()) {
+ // we can re-use the cached document
+ cachedDoc.markUpToDate(System.currentTimeMillis());
+ return (T) cachedDoc;
+ } else {
+ return SR.fromRow(collection, row);
+ }
+ }
+ } catch (Exception ex) {
+ throw new DocumentStoreException(ex);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ }
+
+ private <T extends Document> void delete(Collection<T> collection, String
id) {
+ Connection connection = null;
+ String tableName = getTable(collection);
+ try {
+ connection = this.ch.getRWConnection();
+ dbDelete(connection, tableName, Collections.singletonList(id));
+ connection.commit();
+ } catch (Exception ex) {
+ throw new DocumentStoreException(ex);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ }
+
+ private <T extends Document> void delete(Collection<T> collection,
List<String> ids) {
+ for (List<String> sublist : Lists.partition(ids, 64)) {
+ Connection connection = null;
+ String tableName = getTable(collection);
+ try {
+ connection = this.ch.getRWConnection();
+ dbDelete(connection, tableName, sublist);
+ connection.commit();
+ } catch (Exception ex) {
+ throw new DocumentStoreException(ex);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ }
+ }
+
+ private <T extends Document> boolean updateDocument(@Nonnull Collection<T>
collection, @Nonnull T document,
+ @Nonnull UpdateOp update, Long oldmodcount) {
+ Connection connection = null;
+ String tableName = getTable(collection);
+ try {
+ connection = this.ch.getRWConnection();
+ Long modified = (Long) document.get(MODIFIED);
+ Number flagB = (Number) document.get(NodeDocument.HAS_BINARY_FLAG);
+ Boolean hasBinary = flagB != null && flagB.intValue() ==
NodeDocument.HAS_BINARY_VAL;
+ Boolean flagD = (Boolean) document.get(NodeDocument.DELETED_ONCE);
+ Boolean deletedOnce = flagD != null && flagD.booleanValue();
+ Long modcount = (Long) document.get(MODCOUNT);
+ Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
+ boolean success = false;
+
+ // every 16th update is a full rewrite
+ if (isAppendableUpdate(update) && modcount % 16 != 0) {
+ String appendData = SR.asString(update);
+ if (appendData.length() < this.dataLimitInOctets /
CHAR2OCTETRATIO) {
+ try {
+ success = dbAppendingUpdate(connection, tableName,
document.getId(), modified, hasBinary, deletedOnce,
+ modcount, cmodcount, oldmodcount, appendData);
+ connection.commit();
+ } catch (SQLException ex) {
+ continueIfStringOverflow(ex);
+ this.ch.rollbackConnection(connection);
+ success = false;
+ }
+ }
+ }
+ if (!success) {
+ String data = SR.asString(document);
+ success = dbUpdate(connection, tableName, document.getId(),
modified, hasBinary, deletedOnce, modcount, cmodcount,
+ oldmodcount, data);
+ connection.commit();
+ }
+ return success;
+ } catch (SQLException ex) {
+ this.ch.rollbackConnection(connection);
+ throw new DocumentStoreException(ex);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ }
+
+ private static void continueIfStringOverflow(SQLException ex) throws
SQLException {
+ String state = ex.getSQLState();
+ if ("22001".equals(state) /* everybody */|| ("72000".equals(state) &&
1489 == ex.getErrorCode()) /* Oracle */) {
+ // ok
+ } else {
+ throw (ex);
+ }
+ }
+
+ /*
+ * currently we use append for all updates, but this might change in the
+ * future
+ */
+ private static boolean isAppendableUpdate(UpdateOp update) {
+ return true;
+ }
+
+ /*
+ * check whether this update operation requires knowledge about the
previous
+ * state
+ */
+ private static boolean requiresPreviousState(UpdateOp update) {
+ for (Map.Entry<Key, Operation> change :
update.getChanges().entrySet()) {
+ Operation op = change.getValue();
+ if (op.type == UpdateOp.Operation.Type.CONTAINS_MAP_ENTRY)
+ return true;
+ }
+ return false;
+ }
+
+ private static long getModifiedFromUpdate(UpdateOp update) {
+ for (Map.Entry<Key, Operation> change :
update.getChanges().entrySet()) {
+ Operation op = change.getValue();
+ if (op.type == UpdateOp.Operation.Type.MAX || op.type ==
UpdateOp.Operation.Type.SET) {
+ if (MODIFIED.equals(change.getKey().getName())) {
+ return Long.parseLong(op.value.toString());
+ }
+ }
+ }
+ return 0L;
+ }
+
+ private <T extends Document> void insertDocuments(Collection<T>
collection, List<T> documents) {
+ Connection connection = null;
+ String tableName = getTable(collection);
+ List<String> ids = new ArrayList<String>();
+ try {
+ connection = this.ch.getRWConnection();
+ for (T document : documents) {
+ String data = SR.asString(document);
+ Long modified = (Long) document.get(MODIFIED);
+ Number flagB = (Number)
document.get(NodeDocument.HAS_BINARY_FLAG);
+ Boolean hasBinary = flagB != null && flagB.intValue() ==
NodeDocument.HAS_BINARY_VAL;
+ Boolean flagD = (Boolean)
document.get(NodeDocument.DELETED_ONCE);
+ Boolean deletedOnce = flagD != null && flagD.booleanValue();
+ Long modcount = (Long) document.get(MODCOUNT);
+ Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
+ String id = document.getId();
+ ids.add(id);
+ dbInsert(connection, tableName, id, modified, hasBinary,
deletedOnce, modcount, cmodcount, data);
+ }
+ connection.commit();
+ } catch (SQLException ex) {
+ LOG.debug("insert of " + ids + " failed", ex);
+ this.ch.rollbackConnection(connection);
+ throw new DocumentStoreException(ex);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ }
+
+ // configuration
+
+ // Whether to use GZIP compression
+ private static final boolean NOGZIP = Boolean
+
.getBoolean("org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.NOGZIP");
+ // Number of documents to insert at once for batch create
+ private static final int CHUNKSIZE = Integer.getInteger(
+
"org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.CHUNKSIZE",
64);
+
+ private static byte[] asBytes(String data) {
+ byte[] bytes;
+ try {
+ bytes = data.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException ex) {
+ LOG.error("UTF-8 not supported??", ex);
+ throw new DocumentStoreException(ex);
+ }
+
+ if (NOGZIP) {
+ return bytes;
+ } else {
+ try {
+ ByteArrayOutputStream bos = new
ByteArrayOutputStream(data.length());
+ GZIPOutputStream gos = new GZIPOutputStream(bos) {
+ {
+ // TODO: make this configurable
+ this.def.setLevel(Deflater.BEST_SPEED);
+ }
+ };
+ gos.write(bytes);
+ gos.close();
+ return bos.toByteArray();
+ } catch (IOException ex) {
+ LOG.error("Error while gzipping contents", ex);
+ throw new DocumentStoreException(ex);
+ }
+ }
+ }
+
+ private void setIdInStatement(PreparedStatement stmt, int idx, String id)
throws SQLException {
+ if (db.isPrimaryColumnByteEncoded()) {
+ try {
+ stmt.setBytes(idx, id.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException ex) {
+ LOG.error("UTF-8 not supported??", ex);
+ throw new DocumentStoreException(ex);
+ }
+ } else {
+ stmt.setString(idx, id);
+ }
+ }
+
+ private String getIdFromRS(ResultSet rs, int idx) throws SQLException {
+ String id;
+ if (db.isPrimaryColumnByteEncoded()) {
+ try {
+ id = new String(rs.getBytes(idx), "UTF-8");
+
+ } catch (UnsupportedEncodingException ex) {
+ LOG.error("UTF-8 not supported??", ex);
+ throw new DocumentStoreException(ex);
+ }
+ } else {
+ id = rs.getString(idx);
+ }
+ return id;
+ }
+
+ @CheckForNull
+ private RDBRow dbRead(Connection connection, String tableName, String id,
long lastmodcount) throws SQLException {
+ PreparedStatement stmt;
+ boolean useCaseStatement = lastmodcount != -1 &&
this.db.allowsCaseInSelect();
+ if (useCaseStatement) {
+ // either we don't have a previous version of the document
+ // or the database does not support CASE in SELECT
+ stmt = connection.prepareStatement("select MODIFIED, MODCOUNT,
CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA from "
+ + tableName + " where ID = ?");
+ } else {
+ // the case statement causes the actual row data not to be
+ // sent in case we already have it
+ stmt = connection
+ .prepareStatement("select MODIFIED, MODCOUNT, CMODCOUNT,
HASBINARY, DELETEDONCE, case MODCOUNT when ? then null else DATA end as DATA, "
+ + "case MODCOUNT when ? then null else BDATA end
as BDATA from " + tableName + " where ID = ?");
+ }
+
+ try {
+ if (useCaseStatement) {
+ setIdInStatement(stmt, 1, id);
+ }
+ else {
+ stmt.setLong(1, lastmodcount);
+ stmt.setLong(2, lastmodcount);
+ setIdInStatement(stmt, 3, id);
+ }
+ ResultSet rs = stmt.executeQuery();
+ if (rs.next()) {
+ long modified = rs.getLong(1);
+ long modcount = rs.getLong(2);
+ long cmodcount = rs.getLong(3);
+ long hasBinary = rs.getLong(4);
+ long deletedOnce = rs.getLong(5);
+ String data = rs.getString(6);
+ byte[] bdata = rs.getBytes(7);
+ return new RDBRow(id, hasBinary == 1, deletedOnce == 1,
modified, modcount, cmodcount, data, bdata);
+ } else {
+ return null;
+ }
+ } catch (SQLException ex) {
+ LOG.error("attempting to read " + id + " (id length is " +
id.length() + ")", ex);
+ // DB2 throws an SQLException for invalid keys; handle this more
+ // gracefully
+ if ("22001".equals(ex.getSQLState())) {
+ this.ch.rollbackConnection(connection);
+ return null;
+ } else {
+ throw (ex);
+ }
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private List<RDBRow> dbQuery(Connection connection, String tableName,
String minId, String maxId, String indexedProperty,
+ long startValue, int limit) throws SQLException {
+ String t = "select ";
+ if (limit != Integer.MAX_VALUE && this.db.getFetchFirstSyntax() ==
FETCHFIRSTSYNTAX.TOP) {
+ t += "TOP " + limit + " ";
+ }
+ t += "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA,
BDATA from " + tableName
+ + " where ID > ? and ID < ?";
+ if (indexedProperty != null) {
+ if (MODIFIED.equals(indexedProperty)) {
+ t += " and MODIFIED >= ?";
+ } else if (NodeDocument.HAS_BINARY_FLAG.equals(indexedProperty)) {
+ if (startValue != NodeDocument.HAS_BINARY_VAL) {
+ throw new DocumentStoreException("unsupported value for
property " + NodeDocument.HAS_BINARY_FLAG);
+ }
+ t += " and HASBINARY = 1";
+ } else if (NodeDocument.DELETED_ONCE.equals(indexedProperty)) {
+ if (startValue != 1) {
+ throw new DocumentStoreException("unsupported value for
property " + NodeDocument.DELETED_ONCE);
+ }
+ t += " and DELETEDONCE = 1";
+ } else {
+ throw new DocumentStoreException("unsupported indexed
property: " + indexedProperty);
+ }
+ }
+ t += " order by ID";
+
+ if (limit != Integer.MAX_VALUE) {
+ switch (this.db.getFetchFirstSyntax()) {
+ case LIMIT:
+ t += " LIMIT " + limit;
+ break;
+ case FETCHFIRST:
+ t += " FETCH FIRST " + limit + " ROWS ONLY";
+ break;
+ default:
+ break;
+ }
+ }
+
+ PreparedStatement stmt = connection.prepareStatement(t);
+ List<RDBRow> result = new ArrayList<RDBRow>();
+ try {
+ int si = 1;
+ setIdInStatement(stmt, si++, minId);
+ setIdInStatement(stmt, si++, maxId);
+
+ if (MODIFIED.equals(indexedProperty)) {
+ stmt.setLong(si++, startValue);
+ }
+ if (limit != Integer.MAX_VALUE) {
+ stmt.setFetchSize(limit);
+ }
+ ResultSet rs = stmt.executeQuery();
+ while (rs.next() && result.size() < limit) {
+ String id = getIdFromRS(rs, 1);
+
+ if (id.compareTo(minId) < 0 || id.compareTo(maxId) > 0) {
+ throw new DocumentStoreException("unexpected query result:
'" + minId + "' < '" + id + "' < '" + maxId
+ + "' - broken DB collation?");
+ }
+ long modified = rs.getLong(2);
+ long modcount = rs.getLong(3);
+ long cmodcount = rs.getLong(4);
+ long hasBinary = rs.getLong(5);
+ long deletedOnce = rs.getLong(6);
+ String data = rs.getString(7);
+ byte[] bdata = rs.getBytes(8);
+ result.add(new RDBRow(id, hasBinary == 1, deletedOnce == 1,
modified, modcount, cmodcount, data, bdata));
+ }
+ } finally {
+ stmt.close();
+ }
+ return result;
+ }
+
+ private boolean dbUpdate(Connection connection, String tableName, String
id, Long modified, Boolean hasBinary,
+ Boolean deletedOnce, Long modcount, Long cmodcount, Long
oldmodcount, String data) throws SQLException {
+ String t = "update "
+ + tableName
+ + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT
= ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, BDATA = ? where ID = ?";
+ if (oldmodcount != null) {
+ t += " and MODCOUNT = ?";
+ }
+ PreparedStatement stmt = connection.prepareStatement(t);
+ try {
+ int si = 1;
+ stmt.setObject(si++, modified, Types.BIGINT);
+ stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, deletedOnce ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, modcount, Types.BIGINT);
+ stmt.setObject(si++, cmodcount == null ? 0 : cmodcount,
Types.BIGINT);
+ stmt.setObject(si++, data.length(), Types.BIGINT);
+
+ if (data.length() < this.dataLimitInOctets / CHAR2OCTETRATIO) {
+ stmt.setString(si++, data);
+ stmt.setBinaryStream(si++, null, 0);
+ } else {
+ stmt.setString(si++, "\"blob\"");
+ byte[] bytes = asBytes(data);
+ stmt.setBytes(si++, bytes);
+ }
+
+ setIdInStatement(stmt, si++, id);
+
+ if (oldmodcount != null) {
+ stmt.setObject(si++, oldmodcount, Types.BIGINT);
+ }
+ int result = stmt.executeUpdate();
+ if (result != 1) {
+ LOG.debug("DB update failed for " + tableName + "/" + id + "
with oldmodcount=" + oldmodcount);
+ }
+ return result == 1;
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private boolean dbAppendingUpdate(Connection connection, String tableName,
String id, Long modified, Boolean hasBinary,
+ Boolean deletedOnce, Long modcount, Long cmodcount, Long
oldmodcount, String appendData) throws SQLException {
+ StringBuilder t = new StringBuilder();
+ t.append("update " + tableName + " set MODIFIED = " +
this.db.getGreatestQueryString("MODIFIED")
+ + ", HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT =
?, DSIZE = DSIZE + ?, ");
+ t.append("DATA = " +
this.db.getConcatQueryString(this.dataLimitInOctets, appendData.length()) + "
");
+ t.append("where ID = ?");
+ if (oldmodcount != null) {
+ t.append(" and MODCOUNT = ?");
+ }
+ PreparedStatement stmt = connection.prepareStatement(t.toString());
+ try {
+ int si = 1;
+ stmt.setObject(si++, modified, Types.BIGINT);
+ stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, deletedOnce ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, modcount, Types.BIGINT);
+ stmt.setObject(si++, cmodcount == null ? 0 : cmodcount,
Types.BIGINT);
+ stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
+ stmt.setString(si++, "," + appendData);
+ setIdInStatement(stmt, si++, id);
+
+ if (oldmodcount != null) {
+ stmt.setObject(si++, oldmodcount, Types.BIGINT);
+ }
+ int result = stmt.executeUpdate();
+ if (result != 1) {
+ LOG.debug("DB append update failed for " + tableName + "/" +
id + " with oldmodcount=" + oldmodcount);
+ }
+ return result == 1;
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private boolean dbBatchedAppendingUpdate(Connection connection, String
tableName, List<String> ids, Long modified,
+ String appendData) throws SQLException {
+ StringBuilder t = new StringBuilder();
+ t.append("update " + tableName + " set MODIFIED = " +
this.db.getGreatestQueryString("MODIFIED")
+ + ", MODCOUNT = MODCOUNT + 1, DSIZE = DSIZE + ?, ");
+ t.append("DATA = " +
this.db.getConcatQueryString(this.dataLimitInOctets, appendData.length()) + "
");
+ t.append("where ID in (");
+ for (int i = 0; i < ids.size(); i++) {
+ if (i != 0) {
+ t.append(',');
+ }
+ t.append('?');
+ }
+ t.append(")");
+ PreparedStatement stmt = connection.prepareStatement(t.toString());
+ try {
+ int si = 1;
+ stmt.setObject(si++, modified, Types.BIGINT);
+ stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
+ stmt.setString(si++, "," + appendData);
+ for (String id : ids) {
+ setIdInStatement(stmt, si++, id);
+ }
+ int result = stmt.executeUpdate();
+ if (result != ids.size()) {
+ LOG.debug("DB update failed: only " + result + " of " +
ids.size() + " updated. Table: " + tableName + ", IDs:"
+ + ids);
+ }
+ return result == ids.size();
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private boolean dbInsert(Connection connection, String tableName, String
id, Long modified, Boolean hasBinary,
+ Boolean deletedOnce, Long modcount, Long cmodcount, String data)
throws SQLException {
+ PreparedStatement stmt = connection.prepareStatement("insert into " +
tableName
+ + "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT,
DSIZE, DATA, BDATA) values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
+ try {
+ int si = 1;
+ setIdInStatement(stmt, si++, id);
+ stmt.setObject(si++, modified, Types.BIGINT);
+ stmt.setObject(si++, hasBinary ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, deletedOnce ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, modcount, Types.BIGINT);
+ stmt.setObject(si++, cmodcount == null ? 0 : cmodcount,
Types.BIGINT);
+ stmt.setObject(si++, data.length(), Types.BIGINT);
+ if (data.length() < this.dataLimitInOctets / CHAR2OCTETRATIO) {
+ stmt.setString(si++, data);
+ stmt.setBinaryStream(si++, null, 0);
+ } else {
+ stmt.setString(si++, "\"blob\"");
+ byte[] bytes = asBytes(data);
+ stmt.setBytes(si++, bytes);
+ }
+ int result = stmt.executeUpdate();
+ if (result != 1) {
+ LOG.debug("DB insert failed for " + tableName + "/" + id);
+ }
+ return result == 1;
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private void dbDelete(Connection connection, String tableName,
List<String> ids) throws SQLException {
+
+ PreparedStatement stmt;
+ int cnt = ids.size();
+
+ if (cnt == 1) {
+ stmt = connection.prepareStatement("delete from " + tableName + "
where ID=?");
+ } else {
+ StringBuilder inClause = new StringBuilder();
+ for (int i = 0; i < cnt; i++) {
+ inClause.append('?');
+ if (i != cnt - 1) {
+ inClause.append(',');
+ }
+ }
+ stmt = connection.prepareStatement("delete from " + tableName + "
where ID in (" + inClause.toString() + ")");
+ }
+
+ try {
+ for (int i = 0; i < cnt; i++) {
+ setIdInStatement(stmt, i + 1, ids.get(i));
+ }
+ int result = stmt.executeUpdate();
+ if (result != cnt) {
+ LOG.debug("DB delete failed for " + tableName + "/" + ids);
+ }
+ } finally {
+ stmt.close();
+ }
+ }
+
+ @Override
+ public void setReadWriteMode(String readWriteMode) {
+ // ignored
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T extends Document> T castAsT(NodeDocument doc) {
+ return (T) doc;
+ }
+
+ // Memory Cache
+ private Cache<CacheValue, NodeDocument> nodesCache;
+ private CacheStats cacheStats;
+ private final Striped<Lock> locks = Striped.lock(64);
+
+ private Lock getAndLock(String key) {
+ Lock l = locks.get(key);
+ l.lock();
+ return l;
+ }
+
+ @CheckForNull
+ private static NodeDocument unwrap(@Nonnull NodeDocument doc) {
+ return doc == NodeDocument.NULL ? null : doc;
+ }
+
+ @Nonnull
+ private static NodeDocument wrap(@CheckForNull NodeDocument doc) {
+ return doc == null ? NodeDocument.NULL : doc;
+ }
+
+ /**
+ * Adds a document to the {@link #nodesCache} iff there is no document in
+ * the cache with the document key. This method does not acquire a lock
from
+ * {@link #locks}! The caller must ensure a lock is held for the given
+ * document.
+ *
+ * @param doc
+ * the document to add to the cache.
+ * @return either the given <code>doc</code> or the document already
present
+ * in the cache.
+ */
+ @Nonnull
+ private NodeDocument addToCache(@Nonnull final NodeDocument doc) {
+ if (doc == NodeDocument.NULL) {
+ throw new IllegalArgumentException("doc must not be NULL
document");
+ }
+ doc.seal();
+ // make sure we only cache the document if it wasn't
+ // changed and cached by some other thread in the
+ // meantime. That is, use get() with a Callable,
+ // which is only used when the document isn't there
+ try {
+ CacheValue key = new StringValue(doc.getId());
+ for (;;) {
+ NodeDocument cached = nodesCache.get(key, new
Callable<NodeDocument>() {
+ @Override
+ public NodeDocument call() {
+ return doc;
+ }
+ });
+ if (cached != NodeDocument.NULL) {
+ return cached;
+ } else {
+ nodesCache.invalidate(key);
+ }
+ }
+ } catch (ExecutionException e) {
+ // will never happen because call() just returns
+ // the already available doc
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Nonnull
+ private void applyToCache(@Nonnull final NodeDocument oldDoc, @Nonnull
final NodeDocument newDoc) {
+ NodeDocument cached = addToCache(newDoc);
+ if (cached == newDoc) {
+ // successful
+ return;
+ } else if (oldDoc == null) {
+ // this is an insert and some other thread was quicker
+ // loading it into the cache -> return now
+ return;
+ } else {
+ CacheValue key = new StringValue(newDoc.getId());
+ // this is an update (oldDoc != null)
+ if (Objects.equal(cached.getModCount(), oldDoc.getModCount())) {
+ nodesCache.put(key, newDoc);
+ } else {
+ // the cache entry was modified by some other thread in
+ // the meantime. the updated cache entry may or may not
+ // include this update. we cannot just apply our update
+ // on top of the cached entry.
+ // therefore we must invalidate the cache entry
+ nodesCache.invalidate(key);
+ }
+ }
+ }
+
+ private <T extends Document> void addToCache(Collection<T> collection, T
doc) {
+ if (collection == Collection.NODES) {
+ Lock lock = getAndLock(doc.getId());
+ try {
+ addToCache((NodeDocument) doc);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private <T extends Document> T runThroughCache(Collection<T> collection,
RDBRow row, long now) {
+
+ if (collection != Collection.NODES) {
+ // not in the cache anyway
+ return SR.fromRow(collection, row);
+ }
+
+ String id = row.getId();
+ CacheValue cacheKey = new StringValue(id);
+ NodeDocument inCache = nodesCache.getIfPresent(cacheKey);
+ Number modCount = row.getModcount();
+
+ // do not overwrite document in cache if the
+ // existing one in the cache is newer
+ if (inCache != null && inCache != NodeDocument.NULL) {
+ // check mod count
+ Number cachedModCount = inCache.getModCount();
+ if (cachedModCount == null) {
+ throw new IllegalStateException("Missing " +
Document.MOD_COUNT);
+ }
+ if (modCount.longValue() <= cachedModCount.longValue()) {
+ // we can use the cached document
+ inCache.markUpToDate(now);
+ return (T) inCache;
+ }
+ }
+
+ NodeDocument fresh = (NodeDocument) SR.fromRow(collection, row);
+ fresh.seal();
+
+ Lock lock = getAndLock(id);
+ try {
+ inCache = nodesCache.getIfPresent(cacheKey);
+ if (inCache != null && inCache != NodeDocument.NULL) {
+ // check mod count
+ Number cachedModCount = inCache.getModCount();
+ if (cachedModCount == null) {
+ throw new IllegalStateException("Missing " +
Document.MOD_COUNT);
+ }
+ if (modCount.longValue() > cachedModCount.longValue()) {
+ nodesCache.put(cacheKey, fresh);
+ } else {
+ fresh = inCache;
+ }
+ } else {
+ nodesCache.put(cacheKey, fresh);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return (T) fresh;
+ }
+
+ private boolean hasChangesToCollisions(UpdateOp update) {
+ for (Entry<Key, Operation> e :
checkNotNull(update).getChanges().entrySet()) {
+ Key k = e.getKey();
+ Operation op = e.getValue();
+ if (op.type == Operation.Type.SET_MAP_ENTRY) {
+ if (NodeDocument.COLLISIONS.equals(k.getName())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
Propchange:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
------------------------------------------------------------------------------
svn:executable = *
Added:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java?rev=1659616&view=auto
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java
(added)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java
Fri Feb 13 17:22:26 2015
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.rdb;
+
+import java.util.Locale;
+
+public class RDBJDBCTools {
+
+ protected static String jdbctype(String jdbcurl) {
+ if (jdbcurl == null) {
+ return null;
+ } else {
+ String t = jdbcurl.toLowerCase(Locale.ENGLISH);
+ if (!t.startsWith("jdbc:")) {
+ return null;
+ } else {
+ t = t.substring("jbdc:".length());
+ int p = t.indexOf(":");
+ if (p <= 0) {
+ return t;
+ } else {
+ return t.substring(0, p);
+ }
+ }
+ }
+ }
+
+ protected static String driverForDBType(String type) {
+ if ("h2".equals(type)) {
+ return "org.h2.Driver";
+ } else if ("postgresql".equals(type)) {
+ return "org.postgresql.Driver";
+ } else if ("db2".equals(type)) {
+ return "com.ibm.db2.jcc.DB2Driver";
+ } else if ("mysql".equals(type)) {
+ return "com.mysql.jdbc.Driver";
+ } else if ("oracle".equals(type)) {
+ return "oracle.jdbc.OracleDriver";
+ } else if ("sqlserver".equals(type)) {
+ return "com.microsoft.sqlserver.jdbc.SQLServerDriver";
+ } else {
+ return "";
+ }
+ }
+}
Propchange:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBOptions.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBOptions.java?rev=1659616&view=auto
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBOptions.java
(added)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBOptions.java
Fri Feb 13 17:22:26 2015
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.rdb;
+
+/**
+ * Options applicable to RDB persistence
+ */
+public class RDBOptions {
+
+ private boolean dropTablesOnClose = false;
+ private String tablePrefix = "";
+
+ public RDBOptions() {
+ }
+
+ /**
+ * Whether to drop the tables on close (in case they have been
auto-created)
+ */
+ public RDBOptions dropTablesOnClose(boolean dropTablesOnClose) {
+ this.dropTablesOnClose = dropTablesOnClose;
+ return this;
+ }
+
+ /**
+ * Prefix for table names.
+ */
+ public RDBOptions tablePrefix(String tablePrefix) {
+ this.tablePrefix = tablePrefix;
+ return this;
+ }
+
+ public String getTablePrefix() {
+ return this.tablePrefix;
+ }
+
+ public boolean isDropTablesOnClose() {
+ return this.dropTablesOnClose;
+ }
+}
Propchange:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBOptions.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBRow.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBRow.java?rev=1659616&view=auto
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBRow.java
(added)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBRow.java
Fri Feb 13 17:22:26 2015
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.rdb;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+/**
+ * Container for the information in a RDB database column.
+ * <p>
+ * Note that the String "data" and the byte[] "bdata" may be null
+ * when the SQL SELECT request was conditional on "modcount" being
+ * unchanged.
+ */
+public class RDBRow {
+
+ private final String id;
+ private final boolean hasBinaryProperties, deletedOnce;
+ private final long modified, modcount, cmodcount;
+ private final String data;
+ private final byte[] bdata;
+
+ public RDBRow(String id, boolean hasBinaryProperties, boolean deletedOnce,
long modified, long modcount, long cmodcount, String data, byte[] bdata) {
+ this.id = id;
+ this.hasBinaryProperties = hasBinaryProperties;
+ this.deletedOnce = deletedOnce;
+ this.modified = modified;
+ this.modcount = modcount;
+ this.cmodcount = cmodcount;
+ this.data = data;
+ this.bdata = bdata;
+ }
+
+ @Nonnull
+ public String getId() {
+ return id;
+ }
+
+ public boolean hasBinaryProperties() {
+ return hasBinaryProperties;
+ }
+
+ public boolean deletedOnce() {
+ return deletedOnce;
+ }
+
+ @CheckForNull
+ public String getData() {
+ return data;
+ }
+
+ @Nonnull
+ public long getModified() {
+ return modified;
+ }
+
+ @Nonnull
+ public long getModcount() {
+ return modcount;
+ }
+
+ @Nonnull
+ public long getCollisionsModcount() {
+ return cmodcount;
+ }
+
+ @CheckForNull
+ public byte[] getBdata() {
+ return bdata;
+ }
+}
Propchange:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBRow.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/package-info.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/package-info.java?rev=1659616&view=auto
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/package-info.java
(added)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/package-info.java
Fri Feb 13 17:22:26 2015
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementations of {@link DocumentStore} and {@link BlobStore} for
relational databases.
+ */
+package org.apache.jackrabbit.oak.plugins.document.rdb;
+
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
Propchange:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/package-info.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/package-info.java
------------------------------------------------------------------------------
svn:executable = *
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractDocumentStoreTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractDocumentStoreTest.java?rev=1659616&r1=1659615&r2=1659616&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractDocumentStoreTest.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractDocumentStoreTest.java
Fri Feb 13 17:22:26 2015
@@ -18,34 +18,54 @@ package org.apache.jackrabbit.oak.plugin
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
+
+import javax.sql.DataSource;
import org.junit.After;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public abstract class AbstractDocumentStoreTest {
protected String dsname;
protected DocumentStore ds;
- protected Set<String> removeMe = new HashSet<String>();
protected DocumentStoreFixture dsf;
+ protected DataSource rdbDataSource;
+ protected List<String> removeMe = new ArrayList<String>();
+
+ static final Logger LOG =
LoggerFactory.getLogger(AbstractDocumentStoreTest.class);
public AbstractDocumentStoreTest(DocumentStoreFixture dsf) {
- this.ds = dsf.createDocumentStore();
this.dsf = dsf;
+ this.ds = dsf.createDocumentStore(1);
this.dsname = dsf.getName();
+ this.rdbDataSource = dsf.getRDBDataSource();
}
@After
public void cleanUp() throws Exception {
- for (String id : removeMe) {
+ if (!removeMe.isEmpty()) {
+ long start = System.nanoTime();
try {
-
ds.remove(org.apache.jackrabbit.oak.plugins.document.Collection.NODES, id);
+
ds.remove(org.apache.jackrabbit.oak.plugins.document.Collection.NODES,
removeMe);
} catch (Exception ex) {
- // best effort
+ // retry one by one
+ for (String id : removeMe) {
+ try {
+
ds.remove(org.apache.jackrabbit.oak.plugins.document.Collection.NODES, id);
+ } catch (Exception ex2) {
+ // best effort
+ }
+ }
+ }
+ if (removeMe.size() > 1) {
+ long elapsed = (System.nanoTime() - start) / (1000 * 1000);
+ float rate = (((float)removeMe.size()) / (elapsed == 0 ? 1 :
elapsed));
+ LOG.info(removeMe.size() + " documents removed in " + elapsed
+ "ms (" + rate + "/ms)");
}
}
dsf.dispose();
@@ -53,12 +73,20 @@ public abstract class AbstractDocumentSt
@Parameterized.Parameters
public static Collection<Object[]> fixtures() {
+ return fixtures(false);
+ }
+
+ protected static Collection<Object[]> fixtures(boolean multi) {
Collection<Object[]> result = new ArrayList<Object[]>();
- DocumentStoreFixture candidates[] = new DocumentStoreFixture[] {
DocumentStoreFixture.MEMORY, DocumentStoreFixture.MONGO };
+ DocumentStoreFixture candidates[] = new DocumentStoreFixture[] {
DocumentStoreFixture.MEMORY, DocumentStoreFixture.MONGO,
+ DocumentStoreFixture.RDB_H2, DocumentStoreFixture.RDB_PG,
DocumentStoreFixture.RDB_DB2,
+ DocumentStoreFixture.RDB_MYSQL,
DocumentStoreFixture.RDB_ORACLE, DocumentStoreFixture.RDB_MSSQL };
for (DocumentStoreFixture dsf : candidates) {
if (dsf.isAvailable()) {
- result.add(new Object[] { dsf });
+ if (!multi || dsf.hasSinglePersistence()) {
+ result.add(new Object[] { dsf });
+ }
}
}
Added:
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java?rev=1659616&view=auto
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
(added)
+++
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
Fri Feb 13 17:22:26 2015
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Collection;
+
+import org.junit.runners.Parameterized;
+
+public abstract class AbstractMultiDocumentStoreTest extends
AbstractDocumentStoreTest {
+
+ protected DocumentStore ds1, ds2;
+
+ public AbstractMultiDocumentStoreTest(DocumentStoreFixture dsf) {
+ super(dsf);
+ this.ds1 = super.ds;
+ this.ds2 = dsf.createDocumentStore(2);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> fixtures() {
+ return fixtures(true);
+ }
+}
Propchange:
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java
------------------------------------------------------------------------------
svn:eol-style = native